oci_distribution/
client.rs

1//! OCI distribution client
2//!
3//! *Note*: This client is very feature poor. We hope to expand this to be a complete
4//! OCI distribution client in the future.
5
6use crate::config::ConfigFile;
7use crate::errors::*;
8use crate::manifest::{
9    ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
10    IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
11    IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
12    OCI_IMAGE_MEDIA_TYPE,
13};
14use crate::secrets::RegistryAuth;
15use crate::secrets::*;
16use crate::sha256_digest;
17use crate::Reference;
18
19use crate::errors::{OciDistributionError, Result};
20use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
21use futures_util::future;
22use futures_util::stream::{self, StreamExt, TryStreamExt};
23use futures_util::Stream;
24use http::HeaderValue;
25use http_auth::{parser::ChallengeParser, ChallengeRef};
26use olpc_cjson::CanonicalFormatter;
27use reqwest::header::HeaderMap;
28use reqwest::{RequestBuilder, Url};
29use serde::Deserialize;
30use serde::Serialize;
31use sha2::Digest;
32use std::collections::HashMap;
33use std::convert::TryFrom;
34use std::sync::Arc;
35use tokio::io::{AsyncWrite, AsyncWriteExt};
36use tokio::sync::RwLock;
37use tracing::{debug, trace, warn};
38
39const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
40    IMAGE_MANIFEST_MEDIA_TYPE,
41    IMAGE_MANIFEST_LIST_MEDIA_TYPE,
42    OCI_IMAGE_MEDIA_TYPE,
43    OCI_IMAGE_INDEX_MEDIA_TYPE,
44];
45
46const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
47
48/// Default value for `ClientConfig::max_concurrent_upload`
49pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
50
51/// Default value for `ClientConfig::max_concurrent_download`
52pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
53
54/// The data for an image or module.
55#[derive(Clone)]
56pub struct ImageData {
57    /// The layers of the image or module.
58    pub layers: Vec<ImageLayer>,
59    /// The digest of the image or module.
60    pub digest: Option<String>,
61    /// The Configuration object of the image or module.
62    pub config: Config,
63    /// The manifest of the image or module.
64    pub manifest: Option<OciImageManifest>,
65}
66
67/// The data returned by an OCI registry after a successful push
68/// operation is completed
69pub struct PushResponse {
70    /// Pullable url for the config
71    pub config_url: String,
72    /// Pullable url for the manifest
73    pub manifest_url: String,
74}
75
76/// The data returned by a successful tags/list Request
77#[derive(Deserialize, Debug)]
78pub struct TagResponse {
79    /// Repository Name
80    pub name: String,
81    /// List of existing Tags
82    pub tags: Vec<String>,
83}
84
85/// The data and media type for an image layer
86#[derive(Clone)]
87pub struct ImageLayer {
88    /// The data of this layer
89    pub data: Vec<u8>,
90    /// The media type of this layer
91    pub media_type: String,
92    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
93    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
94    pub annotations: Option<HashMap<String, String>>,
95}
96
97impl ImageLayer {
98    /// Constructs a new ImageLayer struct with provided data and media type
99    pub fn new(
100        data: Vec<u8>,
101        media_type: String,
102        annotations: Option<HashMap<String, String>>,
103    ) -> Self {
104        ImageLayer {
105            data,
106            media_type,
107            annotations,
108        }
109    }
110
111    /// Constructs a new ImageLayer struct with provided data and
112    /// media type application/vnd.oci.image.layer.v1.tar
113    pub fn oci_v1(data: Vec<u8>, annotations: Option<HashMap<String, String>>) -> Self {
114        Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
115    }
116    /// Constructs a new ImageLayer struct with provided data and
117    /// media type application/vnd.oci.image.layer.v1.tar+gzip
118    pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<HashMap<String, String>>) -> Self {
119        Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
120    }
121
122    /// Helper function to compute the sha256 digest of an image layer
123    pub fn sha256_digest(&self) -> String {
124        sha256_digest(&self.data)
125    }
126}
127
128/// The data and media type for a configuration object
129#[derive(Clone)]
130pub struct Config {
131    /// The data of this config object
132    pub data: Vec<u8>,
133    /// The media type of this object
134    pub media_type: String,
135    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
136    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
137    pub annotations: Option<HashMap<String, String>>,
138}
139
140impl Config {
141    /// Constructs a new Config struct with provided data and media type
142    pub fn new(
143        data: Vec<u8>,
144        media_type: String,
145        annotations: Option<HashMap<String, String>>,
146    ) -> Self {
147        Config {
148            data,
149            media_type,
150            annotations,
151        }
152    }
153
154    /// Constructs a new Config struct with provided data and
155    /// media type application/vnd.oci.image.config.v1+json
156    pub fn oci_v1(data: Vec<u8>, annotations: Option<HashMap<String, String>>) -> Self {
157        Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
158    }
159
160    /// Construct a new Config struct with provided [`ConfigFile`] and
161    /// media type `application/vnd.oci.image.config.v1+json`
162    pub fn oci_v1_from_config_file(
163        config_file: ConfigFile,
164        annotations: Option<HashMap<String, String>>,
165    ) -> Result<Self> {
166        let data = serde_json::to_vec(&config_file)?;
167        Ok(Self::new(
168            data,
169            IMAGE_CONFIG_MEDIA_TYPE.to_string(),
170            annotations,
171        ))
172    }
173
174    /// Helper function to compute the sha256 digest of this config object
175    pub fn sha256_digest(&self) -> String {
176        sha256_digest(&self.data)
177    }
178}
179
180impl TryFrom<Config> for ConfigFile {
181    type Error = crate::errors::OciDistributionError;
182
183    fn try_from(config: Config) -> Result<Self> {
184        let config = String::from_utf8(config.data)
185            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
186        let config_file: ConfigFile = serde_json::from_str(&config)
187            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
188        Ok(config_file)
189    }
190}
191
192/// The OCI client connects to an OCI registry and fetches OCI images.
193///
194/// An OCI registry is a container registry that adheres to the OCI Distribution
195/// specification. DockerHub is one example, as are ACR and GCR. This client
196/// provides a native Rust implementation for pulling OCI images.
197///
198/// Some OCI registries support completely anonymous access. But most require
199/// at least an Oauth2 handshake. Typlically, you will want to create a new
200/// client, and then run the `auth()` method, which will attempt to get
201/// a read-only bearer token. From there, pulling images can be done with
202/// the `pull_*` functions.
203///
204/// For true anonymous access, you can skip `auth()`. This is not recommended
205/// unless you are sure that the remote registry does not require Oauth2.
206#[derive(Clone)]
207pub struct Client {
208    config: Arc<ClientConfig>,
209    // Registry -> RegistryAuth
210    auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
211    tokens: TokenCache,
212    client: reqwest::Client,
213    push_chunk_size: usize,
214}
215
216impl Default for Client {
217    fn default() -> Self {
218        Self {
219            config: Arc::default(),
220            auth_store: Arc::default(),
221            tokens: TokenCache::default(),
222            client: reqwest::Client::default(),
223            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
224        }
225    }
226}
227
228/// A source that can provide a `ClientConfig`.
229/// If you are using this crate in your own application, you can implement this
230/// trait on your configuration type so that it can be passed to `Client::from_source`.
231pub trait ClientConfigSource {
232    /// Provides a `ClientConfig`.
233    fn client_config(&self) -> ClientConfig;
234}
235
236impl TryFrom<ClientConfig> for Client {
237    type Error = OciDistributionError;
238
239    fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
240        #[allow(unused_mut)]
241        let mut client_builder = reqwest::Client::builder();
242        #[cfg(not(target_arch = "wasm32"))]
243        let mut client_builder =
244            client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
245
246        client_builder = match () {
247            #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
248            () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
249            #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
250            () => client_builder,
251        };
252
253        #[cfg(not(target_arch = "wasm32"))]
254        for c in &config.extra_root_certificates {
255            let cert = match c.encoding {
256                CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
257                CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
258            };
259            client_builder = client_builder.add_root_certificate(cert);
260        }
261
262        Ok(Self {
263            config: Arc::new(config),
264            client: client_builder.build()?,
265            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
266            ..Default::default()
267        })
268    }
269}
270
271impl Client {
272    /// Create a new client with the supplied config
273    pub fn new(config: ClientConfig) -> Self {
274        Client::try_from(config).unwrap_or_else(|err| {
275            warn!("Cannot create OCI client from config: {:?}", err);
276            warn!("Creating client with default configuration");
277            Self {
278                push_chunk_size: PUSH_CHUNK_MAX_SIZE,
279                ..Default::default()
280            }
281        })
282    }
283
284    /// Create a new client with the supplied config
285    pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
286        Self::new(config_source.client_config())
287    }
288
289    async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
290        self.auth_store
291            .write()
292            .await
293            .insert(registry.to_string(), auth);
294    }
295
296    async fn is_stored_auth(&self, registry: &str) -> bool {
297        self.auth_store.read().await.contains_key(registry)
298    }
299
300    async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
301        if !self.is_stored_auth(registry).await {
302            self.store_auth(registry, auth.clone()).await;
303        }
304    }
305
306    /// Checks if we got a token, if we don't - create it and store it in cache.
307    async fn get_auth_token(
308        &self,
309        reference: &Reference,
310        op: RegistryOperation,
311    ) -> Option<RegistryTokenType> {
312        let registry = reference.resolve_registry();
313        let auth = self.auth_store.read().await.get(registry)?.clone();
314        match self.tokens.get(reference, op).await {
315            Some(token) => Some(token),
316            None => {
317                let token = self._auth(reference, &auth, op).await.ok()??;
318                self.tokens.insert(reference, op, token.clone()).await;
319                Some(token)
320            }
321        }
322    }
323
324    /// Fetches the available Tags for the given Reference
325    ///
326    /// The client will check if it's already been authenticated and if
327    /// not will attempt to do.
328    pub async fn list_tags(
329        &self,
330        image: &Reference,
331        auth: &RegistryAuth,
332        n: Option<usize>,
333        last: Option<&str>,
334    ) -> Result<TagResponse> {
335        let op = RegistryOperation::Pull;
336        let url = self.to_list_tags_url(image);
337
338        self.store_auth_if_needed(image.resolve_registry(), auth)
339            .await;
340
341        let request = self.client.get(&url);
342        let request = if let Some(num) = n {
343            request.query(&[("n", num)])
344        } else {
345            request
346        };
347        let request = if let Some(l) = last {
348            request.query(&[("last", l)])
349        } else {
350            request
351        };
352        let request = RequestBuilderWrapper {
353            client: self,
354            request_builder: request,
355        };
356        let res = request
357            .apply_auth(image, op)
358            .await?
359            .into_request_builder()
360            .send()
361            .await?;
362        let status = res.status();
363        let body = res.bytes().await?;
364
365        validate_registry_response(status, &body, &url)?;
366
367        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
368    }
369
370    /// Pull an image and return the bytes
371    ///
372    /// The client will check if it's already been authenticated and if
373    /// not will attempt to do.
374    pub async fn pull(
375        &self,
376        image: &Reference,
377        auth: &RegistryAuth,
378        accepted_media_types: Vec<&str>,
379    ) -> Result<ImageData> {
380        debug!("Pulling image: {:?}", image);
381        self.store_auth_if_needed(image.resolve_registry(), auth)
382            .await;
383
384        let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
385
386        self.validate_layers(&manifest, accepted_media_types)
387            .await?;
388
389        let layers = stream::iter(&manifest.layers)
390            .map(|layer| {
391                // This avoids moving `self` which is &Self
392                // into the async block. We only want to capture
393                // as &Self
394                let this = &self;
395                async move {
396                    let mut out: Vec<u8> = Vec::new();
397                    debug!("Pulling image layer");
398                    this.pull_blob(image, layer, &mut out).await?;
399                    Ok::<_, OciDistributionError>(ImageLayer::new(
400                        out,
401                        layer.media_type.clone(),
402                        layer.annotations.clone(),
403                    ))
404                }
405            })
406            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
407            .buffer_unordered(self.config.max_concurrent_download)
408            .try_collect()
409            .await?;
410
411        Ok(ImageData {
412            layers,
413            manifest: Some(manifest),
414            config,
415            digest: Some(digest),
416        })
417    }
418
419    /// Push an image and return the uploaded URL of the image
420    ///
421    /// The client will check if it's already been authenticated and if
422    /// not will attempt to do.
423    ///
424    /// If a manifest is not provided, the client will attempt to generate
425    /// it from the provided image and config data.
426    ///
427    /// Returns pullable URL for the image
428    pub async fn push(
429        &self,
430        image_ref: &Reference,
431        layers: &[ImageLayer],
432        config: Config,
433        auth: &RegistryAuth,
434        manifest: Option<OciImageManifest>,
435    ) -> Result<PushResponse> {
436        debug!("Pushing image: {:?}", image_ref);
437        self.store_auth_if_needed(image_ref.resolve_registry(), auth)
438            .await;
439
440        let manifest: OciImageManifest = match manifest {
441            Some(m) => m,
442            None => OciImageManifest::build(layers, &config, None),
443        };
444
445        // Upload layers
446        stream::iter(layers)
447            .map(|layer| {
448                // This avoids moving `self` which is &Self
449                // into the async block. We only want to capture
450                // as &Self
451                let this = &self;
452                async move {
453                    let digest = layer.sha256_digest();
454                    this.push_blob(image_ref, &layer.data, &digest).await?;
455                    Result::Ok(())
456                }
457            })
458            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
459            .buffer_unordered(self.config.max_concurrent_upload)
460            .try_for_each(future::ok)
461            .await?;
462
463        let config_url = self
464            .push_blob(image_ref, &config.data, &manifest.config.digest)
465            .await?;
466        let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
467
468        Ok(PushResponse {
469            config_url,
470            manifest_url,
471        })
472    }
473
474    /// Pushes a blob to the registry
475    pub async fn push_blob(
476        &self,
477        image_ref: &Reference,
478        data: &[u8],
479        digest: &str,
480    ) -> Result<String> {
481        match self.push_blob_chunked(image_ref, data, digest).await {
482            Ok(url) => Ok(url),
483            Err(OciDistributionError::SpecViolationError(violation)) => {
484                warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
485                warn!("Attempting monolithic push");
486                self.push_blob_monolithically(image_ref, data, digest).await
487            }
488            Err(e) => Err(e),
489        }
490    }
491
492    /// Pushes a blob to the registry as a monolith
493    ///
494    /// Returns the pullable location of the blob
495    async fn push_blob_monolithically(
496        &self,
497        image: &Reference,
498        blob_data: &[u8],
499        blob_digest: &str,
500    ) -> Result<String> {
501        let location = self.begin_push_monolithical_session(image).await?;
502        self.push_monolithically(&location, image, blob_data, blob_digest)
503            .await
504    }
505
506    /// Pushes a blob to the registry as a series of chunks
507    ///
508    /// Returns the pullable location of the blob
509    async fn push_blob_chunked(
510        &self,
511        image: &Reference,
512        blob_data: &[u8],
513        blob_digest: &str,
514    ) -> Result<String> {
515        let mut location = self.begin_push_chunked_session(image).await?;
516        let mut start: usize = 0;
517        loop {
518            (location, start) = self.push_chunk(&location, image, blob_data, start).await?;
519            if start >= blob_data.len() {
520                break;
521            }
522        }
523        self.end_push_chunked_session(&location, image, blob_digest)
524            .await
525    }
526
527    /// Perform an OAuth v2 auth request if necessary.
528    ///
529    /// This performs authorization and then stores the token internally to be used
530    /// on other requests.
531    pub async fn auth(
532        &self,
533        image: &Reference,
534        authentication: &RegistryAuth,
535        operation: RegistryOperation,
536    ) -> Result<Option<String>> {
537        self.store_auth_if_needed(image.resolve_registry(), authentication)
538            .await;
539        // preserve old caching behavior
540        match self._auth(image, authentication, operation).await {
541            Ok(Some(RegistryTokenType::Bearer(token))) => {
542                self.tokens
543                    .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
544                    .await;
545                Ok(Some(token.token().to_string()))
546            }
547            Ok(Some(RegistryTokenType::Basic(username, password))) => {
548                self.tokens
549                    .insert(
550                        image,
551                        operation,
552                        RegistryTokenType::Basic(username, password),
553                    )
554                    .await;
555                Ok(None)
556            }
557            Ok(None) => Ok(None),
558            Err(e) => Err(e),
559        }
560    }
561
562    /// Internal auth that retrieves token.
563    async fn _auth(
564        &self,
565        image: &Reference,
566        authentication: &RegistryAuth,
567        operation: RegistryOperation,
568    ) -> Result<Option<RegistryTokenType>> {
569        debug!("Authorizing for image: {:?}", image);
570        // The version request will tell us where to go.
571        let url = format!(
572            "{}://{}/v2/",
573            self.config.protocol.scheme_for(image.resolve_registry()),
574            image.resolve_registry()
575        );
576        debug!(?url);
577        let res = self.client.get(&url).send().await?;
578        let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
579            Some(h) => h,
580            None => return Ok(None),
581        };
582
583        let challenge = match BearerChallenge::try_from(dist_hdr) {
584            Ok(c) => c,
585            Err(e) => {
586                debug!(error = ?e, "Falling back to HTTP Basic Auth");
587                if let RegistryAuth::Basic(username, password) = authentication {
588                    return Ok(Some(RegistryTokenType::Basic(
589                        username.to_string(),
590                        password.to_string(),
591                    )));
592                }
593                return Ok(None);
594            }
595        };
596
597        // Allow for either push or pull authentication
598        let scope = match operation {
599            RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
600            RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
601        };
602
603        let realm = challenge.realm.as_ref();
604        let service = challenge.service.as_ref();
605        let mut query = vec![("scope", &scope)];
606
607        if let Some(s) = service {
608            query.push(("service", s))
609        }
610
611        // TODO: At some point in the future, we should support sending a secret to the
612        // server for auth. This particular workflow is for read-only public auth.
613        debug!(?realm, ?service, ?scope, "Making authentication call");
614
615        let auth_res = self
616            .client
617            .get(realm)
618            .query(&query)
619            .apply_authentication(authentication)
620            .send()
621            .await?;
622
623        match auth_res.status() {
624            reqwest::StatusCode::OK => {
625                let text = auth_res.text().await?;
626                debug!("Received response from auth request: {}", text);
627                let token: RegistryToken = serde_json::from_str(&text)
628                    .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
629                debug!("Successfully authorized for image '{:?}'", image);
630                Ok(Some(RegistryTokenType::Bearer(token)))
631            }
632            _ => {
633                let reason = auth_res.text().await?;
634                debug!("Failed to authenticate for image '{:?}': {}", image, reason);
635                Err(OciDistributionError::AuthenticationFailure(reason))
636            }
637        }
638    }
639
640    /// Fetch a manifest's digest from the remote OCI Distribution service.
641    ///
642    /// If the connection has already gone through authentication, this will
643    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
644    ///
645    /// Will first attempt to read the `Docker-Content-Digest` header using a
646    /// HEAD request. If this header is not present, will make a second GET
647    /// request and return the SHA256 of the response body.
648    pub async fn fetch_manifest_digest(
649        &self,
650        image: &Reference,
651        auth: &RegistryAuth,
652    ) -> Result<String> {
653        self.store_auth_if_needed(image.resolve_registry(), auth)
654            .await;
655
656        let url = self.to_v2_manifest_url(image);
657        debug!("HEAD image manifest from {}", url);
658        let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
659            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
660            .apply_auth(image, RegistryOperation::Pull)
661            .await?
662            .into_request_builder()
663            .send()
664            .await?;
665
666        trace!(headers=?res.headers(), "Got Headers");
667        if res.headers().get("Docker-Content-Digest").is_none() {
668            debug!("GET image manifest from {}", url);
669            let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
670                .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
671                .apply_auth(image, RegistryOperation::Pull)
672                .await?
673                .into_request_builder()
674                .send()
675                .await?;
676            let status = res.status();
677            let headers = res.headers().clone();
678            trace!(headers=?res.headers(), "Got Headers");
679            let body = res.bytes().await?;
680            validate_registry_response(status, &body, &url)?;
681
682            digest_header_value(headers, Some(&body))
683        } else {
684            let status = res.status();
685            let headers = res.headers().clone();
686            let body = res.bytes().await?;
687            validate_registry_response(status, &body, &url)?;
688
689            digest_header_value(headers, None)
690        }
691    }
692
693    async fn validate_layers(
694        &self,
695        manifest: &OciImageManifest,
696        accepted_media_types: Vec<&str>,
697    ) -> Result<()> {
698        if manifest.layers.is_empty() {
699            return Err(OciDistributionError::PullNoLayersError);
700        }
701
702        for layer in &manifest.layers {
703            if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
704                return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
705                    layer.media_type.clone(),
706                ));
707            }
708        }
709
710        Ok(())
711    }
712
713    /// Pull a manifest from the remote OCI Distribution service.
714    ///
715    /// The client will check if it's already been authenticated and if
716    /// not will attempt to do.
717    ///
718    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest)
719    /// and the manifest content digest hash.
720    ///
721    /// If a multi-platform Image Index manifest is encountered, a platform-specific
722    /// Image manifest will be selected using the client's default platform resolution.
723    pub async fn pull_image_manifest(
724        &self,
725        image: &Reference,
726        auth: &RegistryAuth,
727    ) -> Result<(OciImageManifest, String)> {
728        self.store_auth_if_needed(image.resolve_registry(), auth)
729            .await;
730
731        self._pull_image_manifest(image).await
732    }
733
734    /// Pull a manifest from the remote OCI Distribution service without parsing it.
735    ///
736    /// The client will check if it's already been authenticated and if
737    /// not will attempt to do.
738    ///
739    /// A Tuple is returned containing raw byte representation of the manifest
740    /// and the manifest content digest.
741    pub async fn pull_manifest_raw(
742        &self,
743        image: &Reference,
744        auth: &RegistryAuth,
745        accepted_media_types: &[&str],
746    ) -> Result<(Vec<u8>, String)> {
747        self.store_auth_if_needed(image.resolve_registry(), auth)
748            .await;
749
750        self._pull_manifest_raw(image, accepted_media_types).await
751    }
752
753    /// Pull a manifest from the remote OCI Distribution service.
754    ///
755    /// The client will check if it's already been authenticated and if
756    /// not will attempt to do.
757    ///
758    /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest)
759    /// and the manifest content digest hash.
760    pub async fn pull_manifest(
761        &self,
762        image: &Reference,
763        auth: &RegistryAuth,
764    ) -> Result<(OciManifest, String)> {
765        self.store_auth_if_needed(image.resolve_registry(), auth)
766            .await;
767
768        self._pull_manifest(image).await
769    }
770
771    /// Pull an image manifest from the remote OCI Distribution service.
772    ///
773    /// If the connection has already gone through authentication, this will
774    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
775    ///
776    /// If a multi-platform Image Index manifest is encountered, a platform-specific
777    /// Image manifest will be selected using the client's default platform resolution.
778    async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
779        let (manifest, digest) = self._pull_manifest(image).await?;
780        match manifest {
781            OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
782            OciManifest::ImageIndex(image_index_manifest) => {
783                debug!("Inspecting Image Index Manifest");
784                let digest = if let Some(resolver) = &self.config.platform_resolver {
785                    resolver(&image_index_manifest.manifests)
786                } else {
787                    return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
788                };
789
790                match digest {
791                    Some(digest) => {
792                        debug!("Selected manifest entry with digest: {}", digest);
793                        let manifest_entry_reference = Reference::with_digest(
794                            image.registry().to_string(),
795                            image.repository().to_string(),
796                            digest.clone(),
797                        );
798                        self._pull_manifest(&manifest_entry_reference)
799                            .await
800                            .and_then(|(manifest, _digest)| match manifest {
801                                OciManifest::Image(manifest) => Ok((manifest, digest)),
802                                OciManifest::ImageIndex(_) => {
803                                    Err(OciDistributionError::ImageManifestNotFoundError(
804                                        "received Image Index manifest instead".to_string(),
805                                    ))
806                                }
807                            })
808                    }
809                    None => Err(OciDistributionError::ImageManifestNotFoundError(
810                        "no entry found in image index manifest matching client's default platform"
811                            .to_string(),
812                    )),
813                }
814            }
815        }
816    }
817
818    /// Pull a manifest from the remote OCI Distribution service without parsing it.
819    ///
820    /// If the connection has already gone through authentication, this will
821    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
822    async fn _pull_manifest_raw(
823        &self,
824        image: &Reference,
825        accepted_media_types: &[&str],
826    ) -> Result<(Vec<u8>, String)> {
827        let url = self.to_v2_manifest_url(image);
828        debug!("Pulling image manifest from {}", url);
829
830        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
831            .apply_accept(accepted_media_types)?
832            .apply_auth(image, RegistryOperation::Pull)
833            .await?
834            .into_request_builder()
835            .send()
836            .await?;
837        let headers = res.headers().clone();
838        let status = res.status();
839        let body = res.bytes().await?;
840
841        validate_registry_response(status, &body, &url)?;
842
843        let digest = digest_header_value(headers, Some(&body))?;
844
845        Ok((body.to_vec(), digest))
846    }
847
848    /// Pull a manifest from the remote OCI Distribution service.
849    ///
850    /// If the connection has already gone through authentication, this will
851    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
852    async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
853        let (body, digest) = self
854            ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
855            .await?;
856
857        let text = std::str::from_utf8(&body)?;
858
859        self.validate_image_manifest(text).await?;
860
861        debug!("Parsing response as Manifest: {}", &text);
862        let manifest = serde_json::from_str(text)
863            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
864        Ok((manifest, digest))
865    }
866
867    async fn validate_image_manifest(&self, text: &str) -> Result<()> {
868        debug!("validating manifest: {}", text);
869        let versioned: Versioned = serde_json::from_str(text)
870            .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
871        if versioned.schema_version != 2 {
872            return Err(OciDistributionError::UnsupportedSchemaVersionError(
873                versioned.schema_version,
874            ));
875        }
876        if let Some(media_type) = versioned.media_type {
877            if media_type != IMAGE_MANIFEST_MEDIA_TYPE
878                && media_type != OCI_IMAGE_MEDIA_TYPE
879                && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
880                && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
881            {
882                return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
883            }
884        }
885
886        Ok(())
887    }
888
889    /// Pull a manifest and its config from the remote OCI Distribution service.
890    ///
891    /// The client will check if it's already been authenticated and if
892    /// not will attempt to do.
893    ///
894    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest),
895    /// the manifest content digest hash and the contents of the manifests config layer
896    /// as a String.
897    pub async fn pull_manifest_and_config(
898        &self,
899        image: &Reference,
900        auth: &RegistryAuth,
901    ) -> Result<(OciImageManifest, String, String)> {
902        self.store_auth_if_needed(image.resolve_registry(), auth)
903            .await;
904
905        self._pull_manifest_and_config(image)
906            .await
907            .and_then(|(manifest, digest, config)| {
908                Ok((
909                    manifest,
910                    digest,
911                    String::from_utf8(config.data).map_err(|e| {
912                        OciDistributionError::GenericError(Some(format!(
913                            "Cannot not UTF8 compliant: {}",
914                            e
915                        )))
916                    })?,
917                ))
918            })
919    }
920
921    async fn _pull_manifest_and_config(
922        &self,
923        image: &Reference,
924    ) -> Result<(OciImageManifest, String, Config)> {
925        let (manifest, digest) = self._pull_image_manifest(image).await?;
926
927        let mut out: Vec<u8> = Vec::new();
928        debug!("Pulling config layer");
929        self.pull_blob(image, &manifest.config, &mut out).await?;
930        let media_type = manifest.config.media_type.clone();
931        let annotations = manifest.annotations.clone();
932        Ok((manifest, digest, Config::new(out, media_type, annotations)))
933    }
934
935    /// Push a manifest list to an OCI registry.
936    ///
937    /// This pushes a manifest list to an OCI registry.
938    pub async fn push_manifest_list(
939        &self,
940        reference: &Reference,
941        auth: &RegistryAuth,
942        manifest: OciImageIndex,
943    ) -> Result<String> {
944        self.store_auth_if_needed(reference.resolve_registry(), auth)
945            .await;
946        self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
947            .await
948    }
949
950    /// Pull a single layer from an OCI registry.
951    ///
952    /// This pulls the layer for a particular image that is identified by
953    /// the given layer descriptor. The image reference is used to find the
954    /// repository and the registry, but it is not used to verify that
955    /// the digest is a layer inside of the image. (The manifest is
956    /// used for that.)
957    pub async fn pull_blob<T: AsyncWrite + Unpin>(
958        &self,
959        image: &Reference,
960        layer: &OciDescriptor,
961        mut out: T,
962    ) -> Result<()> {
963        let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), &layer.digest);
964
965        let mut response = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
966            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
967            .apply_auth(image, RegistryOperation::Pull)
968            .await?
969            .into_request_builder()
970            .send()
971            .await?;
972
973        if let Some(urls) = &layer.urls {
974            for url in urls {
975                if response.error_for_status_ref().is_ok() {
976                    break;
977                }
978
979                let url = Url::parse(url)
980                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
981
982                if url.scheme() == "http" || url.scheme() == "https" {
983                    // NOTE: we must not authenticate on additional URLs as those
984                    // can be abused to leak credentials or tokens.  Please
985                    // refer to CVE-2020-15157 for more information.
986                    response =
987                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
988                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
989                            .into_request_builder()
990                            .send()
991                            .await?
992                }
993            }
994        }
995
996        let mut stream = response.error_for_status()?.bytes_stream();
997
998        while let Some(bytes) = stream.next().await {
999            out.write_all(&bytes?).await?;
1000        }
1001
1002        Ok(())
1003    }
1004
1005    /// Stream a single layer from an OCI registry.
1006    ///
1007    /// This is a streaming version of [`Client::pull_blob`].
1008    /// Returns [`Stream`](futures_util::Stream).
1009    pub async fn pull_blob_stream(
1010        &self,
1011        image: &Reference,
1012        layer: &OciDescriptor,
1013    ) -> Result<impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>> {
1014        let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), &layer.digest);
1015
1016        let mut response = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1017            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1018            .apply_auth(image, RegistryOperation::Pull)
1019            .await?
1020            .into_request_builder()
1021            .send()
1022            .await?;
1023
1024        if let Some(urls) = &layer.urls {
1025            for url in urls {
1026                if response.error_for_status_ref().is_ok() {
1027                    break;
1028                }
1029
1030                let url = Url::parse(url)
1031                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1032
1033                if url.scheme() == "http" || url.scheme() == "https" {
1034                    // NOTE: we must not authenticate on additional URLs as those
1035                    // can be abused to leak credentials or tokens.  Please
1036                    // refer to CVE-2020-15157 for more information.
1037                    response =
1038                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1039                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1040                            .into_request_builder()
1041                            .send()
1042                            .await?
1043                }
1044            }
1045        }
1046
1047        let stream = response
1048            .error_for_status()?
1049            .bytes_stream()
1050            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1051
1052        Ok(stream)
1053    }
1054
1055    /// Begins a session to push an image to registry in a monolithical way
1056    ///
1057    /// Returns URL with session UUID
1058    async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1059        let url = &self.to_v2_blob_upload_url(image);
1060        debug!(?url, "begin_push_monolithical_session");
1061        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1062            .apply_auth(image, RegistryOperation::Push)
1063            .await?
1064            .into_request_builder()
1065            .send()
1066            .await?;
1067
1068        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1069        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1070            .await
1071    }
1072
1073    /// Begins a session to push an image to registry as a series of chunks
1074    ///
1075    /// Returns URL with session UUID
1076    async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1077        let url = &self.to_v2_blob_upload_url(image);
1078        debug!(?url, "begin_push_session");
1079        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1080            .apply_auth(image, RegistryOperation::Push)
1081            .await?
1082            .into_request_builder()
1083            .header("Content-Length", 0)
1084            .send()
1085            .await?;
1086
1087        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1088        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1089            .await
1090    }
1091
1092    /// Closes the chunked push session
1093    ///
1094    /// Returns the pullable URL for the image
1095    async fn end_push_chunked_session(
1096        &self,
1097        location: &str,
1098        image: &Reference,
1099        digest: &str,
1100    ) -> Result<String> {
1101        let url = Url::parse_with_params(location, &[("digest", digest)])
1102            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1103        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1104            .apply_auth(image, RegistryOperation::Push)
1105            .await?
1106            .into_request_builder()
1107            .header("Content-Length", 0)
1108            .send()
1109            .await?;
1110        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1111            .await
1112    }
1113
1114    /// Pushes a layer to a registry as a monolithical blob.
1115    ///
1116    /// Returns the URL location for the next layer
1117    async fn push_monolithically(
1118        &self,
1119        location: &str,
1120        image: &Reference,
1121        layer: &[u8],
1122        blob_digest: &str,
1123    ) -> Result<String> {
1124        let mut url = Url::parse(location).unwrap();
1125        url.query_pairs_mut().append_pair("digest", blob_digest);
1126        let url = url.to_string();
1127
1128        debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1129        if layer.is_empty() {
1130            return Err(OciDistributionError::PushNoDataError);
1131        };
1132        let mut headers = HeaderMap::new();
1133        headers.insert(
1134            "Content-Length",
1135            format!("{}", layer.len()).parse().unwrap(),
1136        );
1137        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1138
1139        let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1140            .apply_auth(image, RegistryOperation::Push)
1141            .await?
1142            .into_request_builder()
1143            .headers(headers)
1144            .body(layer.to_vec())
1145            .send()
1146            .await?;
1147
1148        // Returns location
1149        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1150            .await
1151    }
1152
1153    /// Pushes a single chunk of a blob to a registry,
1154    /// as part of a chunked blob upload.
1155    ///
1156    /// Returns the URL location for the next chunk
1157    async fn push_chunk(
1158        &self,
1159        location: &str,
1160        image: &Reference,
1161        blob_data: &[u8],
1162        start_byte: usize,
1163    ) -> Result<(String, usize)> {
1164        if blob_data.is_empty() {
1165            return Err(OciDistributionError::PushNoDataError);
1166        };
1167        let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
1168            start_byte + self.push_chunk_size - 1
1169        } else {
1170            blob_data.len() - 1
1171        };
1172        let body = blob_data[start_byte..end_byte + 1].to_vec();
1173        let mut headers = HeaderMap::new();
1174        headers.insert(
1175            "Content-Range",
1176            format!("{}-{}", start_byte, end_byte).parse().unwrap(),
1177        );
1178        headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
1179        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1180
1181        debug!(
1182            ?start_byte,
1183            ?end_byte,
1184            blob_data_len = blob_data.len(),
1185            body_len = body.len(),
1186            ?location,
1187            ?headers,
1188            "Pushing chunk"
1189        );
1190
1191        let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1192            .apply_auth(image, RegistryOperation::Push)
1193            .await?
1194            .into_request_builder()
1195            .headers(headers)
1196            .body(body)
1197            .send()
1198            .await?;
1199
1200        // Returns location for next chunk and the start byte for the next range
1201        Ok((
1202            self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1203                .await?,
1204            end_byte + 1,
1205        ))
1206    }
1207
1208    /// Mounts a blob to the provided reference, from the given source
1209    pub async fn mount_blob(
1210        &self,
1211        image: &Reference,
1212        source: &Reference,
1213        digest: &str,
1214    ) -> Result<()> {
1215        let base_url = self.to_v2_blob_upload_url(image);
1216        let url = Url::parse_with_params(
1217            &base_url,
1218            &[("mount", digest), ("from", source.repository())],
1219        )
1220        .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1221
1222        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1223            .apply_auth(image, RegistryOperation::Push)
1224            .await?
1225            .into_request_builder()
1226            .send()
1227            .await?;
1228
1229        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1230            .await?;
1231
1232        Ok(())
1233    }
1234
1235    /// Pushes the manifest for a specified image
1236    ///
1237    /// Returns pullable manifest URL
1238    pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1239        let mut headers = HeaderMap::new();
1240        let content_type = manifest.content_type();
1241        headers.insert("Content-Type", content_type.parse().unwrap());
1242
1243        // Serialize the manifest with a canonical json formatter, as described at
1244        // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json
1245        let mut body = Vec::new();
1246        let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1247        manifest.serialize(&mut ser).unwrap();
1248
1249        self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1250            .await
1251    }
1252
1253    /// Pushes the manifest, provided as raw bytes, for a specified image
1254    ///
1255    /// Returns pullable manifest url
1256    pub async fn push_manifest_raw(
1257        &self,
1258        image: &Reference,
1259        body: Vec<u8>,
1260        content_type: HeaderValue,
1261    ) -> Result<String> {
1262        let url = self.to_v2_manifest_url(image);
1263        debug!(?url, ?content_type, "push manifest");
1264
1265        let mut headers = HeaderMap::new();
1266        headers.insert("Content-Type", content_type);
1267
1268        // Calculate the digest of the manifest, this is useful
1269        // if the remote registry is violating the OCI Distribution Specification.
1270        // See below for more details.
1271        let manifest_hash = sha256_digest(&body);
1272
1273        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1274            .apply_auth(image, RegistryOperation::Push)
1275            .await?
1276            .into_request_builder()
1277            .headers(headers)
1278            .body(body)
1279            .send()
1280            .await?;
1281
1282        let ret = self
1283            .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1284            .await;
1285
1286        if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1287            // The registry is violating the OCI Distribution Spec, BUT the OCI
1288            // image/artifact has been uploaded successfully.
1289            // The `Location` header contains the sha256 digest of the manifest,
1290            // we can reuse the value we calculated before.
1291            // The workaround is there because repositories such as
1292            // AWS ECR are violating this aspect of the spec. This at least let the
1293            // oci-distribution users interact with these registries.
1294            warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1295
1296            let url_base = url
1297                .strip_suffix(image.tag().unwrap_or("latest"))
1298                .expect("The manifest URL always ends with the image tag suffix");
1299            let url_by_digest = format!("{}{}", url_base, manifest_hash);
1300
1301            return Ok(url_by_digest);
1302        }
1303
1304        ret
1305    }
1306
1307    async fn extract_location_header(
1308        &self,
1309        image: &Reference,
1310        res: reqwest::Response,
1311        expected_status: &reqwest::StatusCode,
1312    ) -> Result<String> {
1313        debug!(expected_status_code=?expected_status.as_u16(),
1314            status_code=?res.status().as_u16(),
1315            "extract location header");
1316        if res.status().eq(expected_status) {
1317            let location_header = res.headers().get("Location");
1318            debug!(location=?location_header, "Location header");
1319            match location_header {
1320                None => Err(OciDistributionError::RegistryNoLocationError),
1321                Some(lh) => self.location_header_to_url(image, lh),
1322            }
1323        } else if res.status().is_success() && expected_status.is_success() {
1324            Err(OciDistributionError::SpecViolationError(format!(
1325                "Expected HTTP Status {}, got {} instead",
1326                expected_status,
1327                res.status(),
1328            )))
1329        } else {
1330            let url = res.url().to_string();
1331            let code = res.status().as_u16();
1332            let message = res.text().await?;
1333            Err(OciDistributionError::ServerError { url, code, message })
1334        }
1335    }
1336
1337    /// Helper function to convert location header to URL
1338    ///
1339    /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path)
1340    /// Returns a properly formatted absolute URL
1341    fn location_header_to_url(
1342        &self,
1343        image: &Reference,
1344        location_header: &reqwest::header::HeaderValue,
1345    ) -> Result<String> {
1346        let lh = location_header.to_str()?;
1347        if lh.starts_with("/v2/") {
1348            Ok(format!(
1349                "{}://{}{}",
1350                self.config.protocol.scheme_for(image.resolve_registry()),
1351                image.resolve_registry(),
1352                lh
1353            ))
1354        } else {
1355            Ok(lh.to_string())
1356        }
1357    }
1358
1359    /// Convert a Reference to a v2 manifest URL.
1360    fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1361        if let Some(digest) = reference.digest() {
1362            format!(
1363                "{}://{}/v2/{}/manifests/{}",
1364                self.config
1365                    .protocol
1366                    .scheme_for(reference.resolve_registry()),
1367                reference.resolve_registry(),
1368                reference.repository(),
1369                digest,
1370            )
1371        } else {
1372            format!(
1373                "{}://{}/v2/{}/manifests/{}",
1374                self.config
1375                    .protocol
1376                    .scheme_for(reference.resolve_registry()),
1377                reference.resolve_registry(),
1378                reference.repository(),
1379                reference.tag().unwrap_or("latest")
1380            )
1381        }
1382    }
1383
1384    /// Convert a Reference to a v2 blob (layer) URL.
1385    fn to_v2_blob_url(&self, registry: &str, repository: &str, digest: &str) -> String {
1386        format!(
1387            "{}://{}/v2/{}/blobs/{}",
1388            self.config.protocol.scheme_for(registry),
1389            registry,
1390            repository,
1391            digest,
1392        )
1393    }
1394
1395    /// Convert a Reference to a v2 blob upload URL.
1396    fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1397        self.to_v2_blob_url(
1398            reference.resolve_registry(),
1399            reference.repository(),
1400            "uploads/",
1401        )
1402    }
1403
1404    fn to_list_tags_url(&self, reference: &Reference) -> String {
1405        format!(
1406            "{}://{}/v2/{}/tags/list",
1407            self.config
1408                .protocol
1409                .scheme_for(reference.resolve_registry()),
1410            reference.resolve_registry(),
1411            reference.repository(),
1412        )
1413    }
1414}
1415
1416/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404.
1417/// Obviously, HTTP servers are going to send other codes. This tries to catch the
1418/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error.
1419fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1420    match status {
1421        reqwest::StatusCode::OK => Ok(()),
1422        reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1423            url: url.to_string(),
1424        }),
1425        s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1426            "Expected HTTP Status {}, got {} instead",
1427            reqwest::StatusCode::OK,
1428            status,
1429        ))),
1430        s if s.is_client_error() => {
1431            let text = std::str::from_utf8(body)?;
1432            // According to the OCI spec, we should see an error in the message body.
1433            let envelope = serde_json::from_str::<OciEnvelope>(text)?;
1434            Err(OciDistributionError::RegistryError {
1435                envelope,
1436                url: url.to_string(),
1437            })
1438        }
1439        s => {
1440            let text = std::str::from_utf8(body)?;
1441
1442            Err(OciDistributionError::ServerError {
1443                code: s.as_u16(),
1444                url: url.to_string(),
1445                message: text.to_string(),
1446            })
1447        }
1448    }
1449}
1450
1451/// The request builder wrapper allows to be instantiated from a
1452/// `Client` and allows composable operations on the request builder,
1453/// to produce a `RequestBuilder` object that can be executed.
1454struct RequestBuilderWrapper<'a> {
1455    client: &'a Client,
1456    request_builder: RequestBuilder,
1457}
1458
1459// RequestBuilderWrapper type management
1460impl<'a> RequestBuilderWrapper<'a> {
1461    /// Create a `RequestBuilderWrapper` from a `Client` instance, by
1462    /// instantiating the internal `RequestBuilder` with the provided
1463    /// function `f`.
1464    fn from_client(
1465        client: &'a Client,
1466        f: impl Fn(&reqwest::Client) -> RequestBuilder,
1467    ) -> RequestBuilderWrapper {
1468        let request_builder = f(&client.client);
1469        RequestBuilderWrapper {
1470            client,
1471            request_builder,
1472        }
1473    }
1474
1475    // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper`
1476    fn into_request_builder(self) -> RequestBuilder {
1477        self.request_builder
1478    }
1479}
1480
1481// Composable functions applicable to a `RequestBuilderWrapper`
1482impl<'a> RequestBuilderWrapper<'a> {
1483    fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
1484        let request_builder = self
1485            .request_builder
1486            .try_clone()
1487            .ok_or_else(|| {
1488                OciDistributionError::GenericError(Some(
1489                    "could not clone request builder".to_string(),
1490                ))
1491            })?
1492            .header("Accept", Vec::from(accept).join(", "));
1493
1494        Ok(RequestBuilderWrapper {
1495            client: self.client,
1496            request_builder,
1497        })
1498    }
1499
1500    /// Updates request as necessary for authentication.
1501    ///
1502    /// If the struct has Some(bearer), this will insert the bearer token in an
1503    /// Authorization header. It will also set the Accept header, which must
1504    /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth
1505    /// credentials, these will be configured.
1506    async fn apply_auth(
1507        &self,
1508        image: &Reference,
1509        op: RegistryOperation,
1510    ) -> Result<RequestBuilderWrapper> {
1511        let mut headers = HeaderMap::new();
1512
1513        if let Some(token) = self.client.get_auth_token(image, op).await {
1514            match token {
1515                RegistryTokenType::Bearer(token) => {
1516                    debug!("Using bearer token authentication.");
1517                    headers.insert("Authorization", token.bearer_token().parse().unwrap());
1518                }
1519                RegistryTokenType::Basic(username, password) => {
1520                    debug!("Using HTTP basic authentication.");
1521                    return Ok(RequestBuilderWrapper {
1522                        client: self.client,
1523                        request_builder: self
1524                            .request_builder
1525                            .try_clone()
1526                            .ok_or_else(|| {
1527                                OciDistributionError::GenericError(Some(
1528                                    "could not clone request builder".to_string(),
1529                                ))
1530                            })?
1531                            .headers(headers)
1532                            .basic_auth(username.to_string(), Some(password.to_string())),
1533                    });
1534                }
1535            }
1536        }
1537        Ok(RequestBuilderWrapper {
1538            client: self.client,
1539            request_builder: self
1540                .request_builder
1541                .try_clone()
1542                .ok_or_else(|| {
1543                    OciDistributionError::GenericError(Some(
1544                        "could not clone request builder".to_string(),
1545                    ))
1546                })?
1547                .headers(headers),
1548        })
1549    }
1550}
1551
1552/// The encoding of the certificate
1553#[derive(Debug, Clone)]
1554pub enum CertificateEncoding {
1555    #[allow(missing_docs)]
1556    Der,
1557    #[allow(missing_docs)]
1558    Pem,
1559}
1560
1561/// A x509 certificate
1562#[derive(Debug, Clone)]
1563pub struct Certificate {
1564    /// Which encoding is used by the certificate
1565    pub encoding: CertificateEncoding,
1566
1567    /// Actual certificate
1568    pub data: Vec<u8>,
1569}
1570
1571/// A client configuration
1572pub struct ClientConfig {
1573    /// Which protocol the client should use
1574    pub protocol: ClientProtocol,
1575
1576    /// Accept invalid hostname. Defaults to false
1577    #[cfg(feature = "native-tls")]
1578    pub accept_invalid_hostnames: bool,
1579
1580    /// Accept invalid certificates. Defaults to false
1581    pub accept_invalid_certificates: bool,
1582
1583    /// A list of extra root certificate to trust. This can be used to connect
1584    /// to servers using self-signed certificates
1585    pub extra_root_certificates: Vec<Certificate>,
1586
1587    /// A function that defines the client's behaviour if an Image Index Manifest
1588    /// (i.e Manifest List) is encountered when pulling an image.
1589    /// Defaults to [current_platform_resolver](self::current_platform_resolver),
1590    /// which attempts to choose an image matching the running OS and Arch.
1591    ///
1592    /// If set to None, an error is raised if an Image Index manifest is received
1593    /// during an image pull.
1594    pub platform_resolver: Option<Box<PlatformResolverFn>>,
1595
1596    /// Maximum number of concurrent uploads to perform during a `push`
1597    /// operation.
1598    ///
1599    /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
1600    pub max_concurrent_upload: usize,
1601
1602    /// Maximum number of concurrent downloads to perform during a `pull`
1603    /// operation.
1604    ///
1605    /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
1606    pub max_concurrent_download: usize,
1607}
1608
1609impl Default for ClientConfig {
1610    fn default() -> Self {
1611        Self {
1612            protocol: ClientProtocol::default(),
1613            #[cfg(feature = "native-tls")]
1614            accept_invalid_hostnames: false,
1615            accept_invalid_certificates: false,
1616            extra_root_certificates: Vec::new(),
1617            platform_resolver: Some(Box::new(current_platform_resolver)),
1618            max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
1619            max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
1620        }
1621    }
1622}
1623
1624// Be explicit about the traits supported by this type. This is needed to use
1625// the Client behind a dynamic reference.
1626// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549
1627type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
1628
1629/// A platform resolver that chooses the first linux/amd64 variant, if present
1630pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1631    manifests
1632        .iter()
1633        .find(|entry| {
1634            entry.platform.as_ref().map_or(false, |platform| {
1635                platform.os == "linux" && platform.architecture == "amd64"
1636            })
1637        })
1638        .map(|entry| entry.digest.clone())
1639}
1640
1641/// A platform resolver that chooses the first windows/amd64 variant, if present
1642pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1643    manifests
1644        .iter()
1645        .find(|entry| {
1646            entry.platform.as_ref().map_or(false, |platform| {
1647                platform.os == "windows" && platform.architecture == "amd64"
1648            })
1649        })
1650        .map(|entry| entry.digest.clone())
1651}
1652
1653const MACOS: &str = "macos";
1654const DARWIN: &str = "darwin";
1655
1656fn go_os() -> &'static str {
1657    // Massage Rust OS var to GO OS:
1658    // - Rust: https://doc.rust-lang.org/std/env/consts/constant.OS.html
1659    // - Go: https://golang.org/doc/install/source#environment
1660    match std::env::consts::OS {
1661        MACOS => DARWIN,
1662        other => other,
1663    }
1664}
1665
1666const X86_64: &str = "x86_64";
1667const AMD64: &str = "amd64";
1668const X86: &str = "x86";
1669const AMD: &str = "amd";
1670const ARM64: &str = "arm64";
1671const AARCH64: &str = "aarch64";
1672const POWERPC64: &str = "powerpc64";
1673const PPC64LE: &str = "ppc64le";
1674
1675fn go_arch() -> &'static str {
1676    // Massage Rust Architecture vars to GO equivalent:
1677    // - Rust: https://doc.rust-lang.org/std/env/consts/constant.ARCH.html
1678    // - Go: https://golang.org/doc/install/source#environment
1679    match std::env::consts::ARCH {
1680        X86_64 => AMD64,
1681        X86 => AMD,
1682        AARCH64 => ARM64,
1683        POWERPC64 => PPC64LE,
1684        other => other,
1685    }
1686}
1687
1688/// A platform resolver that chooses the first variant matching the running OS/Arch, if present.
1689/// Doesn't currently handle platform.variants.
1690pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1691    manifests
1692        .iter()
1693        .find(|entry| {
1694            entry.platform.as_ref().map_or(false, |platform| {
1695                platform.os == go_os() && platform.architecture == go_arch()
1696            })
1697        })
1698        .map(|entry| entry.digest.clone())
1699}
1700
1701/// The protocol that the client should use to connect
1702#[derive(Debug, Clone, PartialEq, Eq, Default)]
1703pub enum ClientProtocol {
1704    #[allow(missing_docs)]
1705    Http,
1706    #[allow(missing_docs)]
1707    #[default]
1708    Https,
1709    #[allow(missing_docs)]
1710    HttpsExcept(Vec<String>),
1711}
1712
1713impl ClientProtocol {
1714    fn scheme_for(&self, registry: &str) -> &str {
1715        match self {
1716            ClientProtocol::Https => "https",
1717            ClientProtocol::Http => "http",
1718            ClientProtocol::HttpsExcept(exceptions) => {
1719                if exceptions.contains(&registry.to_owned()) {
1720                    "http"
1721                } else {
1722                    "https"
1723                }
1724            }
1725        }
1726    }
1727}
1728
1729#[derive(Clone, Debug)]
1730struct BearerChallenge {
1731    pub realm: Box<str>,
1732    pub service: Option<String>,
1733}
1734
1735impl TryFrom<&HeaderValue> for BearerChallenge {
1736    type Error = String;
1737
1738    fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
1739        let parser = ChallengeParser::new(
1740            value
1741                .to_str()
1742                .map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
1743        );
1744        parser
1745            .filter_map(|parser_res| {
1746                if let Ok(chalenge_ref) = parser_res {
1747                    let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
1748                    bearer_challenge.ok()
1749                } else {
1750                    None
1751                }
1752            })
1753            .next()
1754            .ok_or_else(|| "Cannot find Bearer challenge".to_string())
1755    }
1756}
1757
1758impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
1759    type Error = String;
1760
1761    fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
1762        if !value.scheme.eq_ignore_ascii_case("Bearer") {
1763            return Err(format!(
1764                "BearerChallenge doesn't support challenge scheme {:?}",
1765                value.scheme
1766            ));
1767        }
1768        let mut realm = None;
1769        let mut service = None;
1770        for (k, v) in &value.params {
1771            if k.eq_ignore_ascii_case("realm") {
1772                realm = Some(v.to_unescaped());
1773            }
1774
1775            if k.eq_ignore_ascii_case("service") {
1776                service = Some(v.to_unescaped());
1777            }
1778        }
1779
1780        let realm = realm.ok_or("missing required parameter realm")?;
1781
1782        Ok(BearerChallenge {
1783            realm: realm.into_boxed_str(),
1784            service,
1785        })
1786    }
1787}
1788
1789/// Extract `Docker-Content-Digest` header from manifest GET or HEAD request.
1790/// Can optionally supply a response body (i.e. the manifest itself) to
1791/// fallback to manually hashing this content. This should only be done if the
1792/// response body contains the image manifest.
1793fn digest_header_value(headers: HeaderMap, body: Option<&[u8]>) -> Result<String> {
1794    let digest_header = headers.get("Docker-Content-Digest");
1795    match digest_header {
1796        None => {
1797            if let Some(body) = body {
1798                // Fallback to hashing payload (tested with ECR)
1799                let digest = sha2::Sha256::digest(body);
1800                let hex = format!("sha256:{:x}", digest);
1801                debug!(%hex, "Computed digest of manifest payload.");
1802                Ok(hex)
1803            } else {
1804                Err(OciDistributionError::RegistryNoDigestError)
1805            }
1806        }
1807        Some(hv) => hv
1808            .to_str()
1809            .map(|s| s.to_string())
1810            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string()))),
1811    }
1812}
1813
1814#[cfg(test)]
1815mod test {
1816    use super::*;
1817    use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
1818    use std::convert::TryFrom;
1819    use std::fs;
1820    use std::path;
1821    use std::result::Result;
1822    use tempfile::TempDir;
1823    use tokio::io::AsyncReadExt;
1824    use tokio_util::io::StreamReader;
1825
1826    #[cfg(feature = "test-registry")]
1827    use testcontainers::{clients, core::WaitFor, GenericImage};
1828
1829    const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
1830    const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
1831    const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
1832    const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
1833    const TEST_IMAGES: &[&str] = &[
1834        // TODO(jlegrone): this image cannot be pulled currently because no `latest`
1835        //                 tag exists on the image repository. Re-enable this image
1836        //                 in tests once `latest` is published.
1837        // HELLO_IMAGE_NO_TAG,
1838        HELLO_IMAGE_TAG,
1839        HELLO_IMAGE_DIGEST,
1840        HELLO_IMAGE_TAG_AND_DIGEST,
1841    ];
1842    const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
1843    const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
1844    const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
1845    const HTPASSWD_USERNAME: &str = "testuser";
1846    const HTPASSWD_PASSWORD: &str = "testpassword";
1847
1848    #[test]
1849    fn test_apply_accept() -> anyhow::Result<()> {
1850        assert_eq!(
1851            RequestBuilderWrapper::from_client(&Client::default(), |client| client
1852                .get("https://example.com/some/module.wasm"))
1853            .apply_accept(&["*/*"])?
1854            .into_request_builder()
1855            .build()?
1856            .headers()["Accept"],
1857            "*/*"
1858        );
1859
1860        assert_eq!(
1861            RequestBuilderWrapper::from_client(&Client::default(), |client| client
1862                .get("https://example.com/some/module.wasm"))
1863            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1864            .into_request_builder()
1865            .build()?
1866            .headers()["Accept"],
1867            MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
1868        );
1869
1870        Ok(())
1871    }
1872
1873    #[tokio::test]
1874    async fn test_apply_auth_no_token() -> anyhow::Result<()> {
1875        assert!(
1876            !RequestBuilderWrapper::from_client(&Client::default(), |client| client
1877                .get("https://example.com/some/module.wasm"))
1878            .apply_auth(
1879                &Reference::try_from(HELLO_IMAGE_TAG)?,
1880                RegistryOperation::Pull
1881            )
1882            .await?
1883            .into_request_builder()
1884            .build()?
1885            .headers()
1886            .contains_key("Authorization")
1887        );
1888
1889        Ok(())
1890    }
1891
1892    #[tokio::test]
1893    async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
1894        use hmac::{Hmac, Mac};
1895        use jwt::SignWithKey;
1896        use sha2::Sha256;
1897        let client = Client::default();
1898        let header = jwt::header::Header {
1899            algorithm: jwt::algorithm::AlgorithmType::Hs256,
1900            key_id: None,
1901            type_: None,
1902            content_type: None,
1903        };
1904        let claims: jwt::claims::Claims = Default::default();
1905        let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
1906        let token = jwt::Token::new(header, claims)
1907            .sign_with_key(&key)?
1908            .as_str()
1909            .to_string();
1910
1911        // we have to have it in the stored auth so we'll get to the token cache check.
1912        client
1913            .store_auth(
1914                &Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
1915                RegistryAuth::Anonymous,
1916            )
1917            .await;
1918
1919        client
1920            .tokens
1921            .insert(
1922                &Reference::try_from(HELLO_IMAGE_TAG)?,
1923                RegistryOperation::Pull,
1924                RegistryTokenType::Bearer(RegistryToken::Token {
1925                    token: token.clone(),
1926                }),
1927            )
1928            .await;
1929        assert_eq!(
1930            RequestBuilderWrapper::from_client(&client, |client| client
1931                .get("https://example.com/some/module.wasm"))
1932            .apply_auth(
1933                &Reference::try_from(HELLO_IMAGE_TAG)?,
1934                RegistryOperation::Pull
1935            )
1936            .await?
1937            .into_request_builder()
1938            .build()?
1939            .headers()["Authorization"],
1940            format!("Bearer {}", &token)
1941        );
1942
1943        Ok(())
1944    }
1945
1946    #[test]
1947    fn test_to_v2_blob_url() {
1948        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
1949        let blob_url = Client::default().to_v2_blob_url(
1950            image.registry(),
1951            image.repository(),
1952            "sha256:deadbeef",
1953        );
1954        assert_eq!(
1955            blob_url,
1956            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
1957        )
1958    }
1959
1960    #[test]
1961    fn test_to_v2_manifest() {
1962        let c = Client::default();
1963
1964        for &(image, expected_uri) in [
1965            (HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest"), // TODO: confirm this is the right translation when no tag
1966            (HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1"),
1967            (HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"),
1968            (HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"),
1969            ].iter() {
1970                let reference = Reference::try_from(image).expect("failed to parse reference");
1971                assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
1972            }
1973    }
1974
1975    #[test]
1976    fn test_to_v2_blob_upload_url() {
1977        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
1978        let blob_url = Client::default().to_v2_blob_upload_url(&image);
1979
1980        assert_eq!(
1981            blob_url,
1982            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
1983        )
1984    }
1985
1986    #[test]
1987    fn test_to_list_tags_url() {
1988        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
1989        let blob_url = Client::default().to_list_tags_url(&image);
1990
1991        assert_eq!(
1992            blob_url,
1993            "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
1994        )
1995    }
1996
1997    #[test]
1998    fn manifest_url_generation_respects_http_protocol() {
1999        let c = Client::new(ClientConfig {
2000            protocol: ClientProtocol::Http,
2001            ..Default::default()
2002        });
2003        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2004            .expect("Could not parse reference");
2005        assert_eq!(
2006            "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2007            c.to_v2_manifest_url(&reference)
2008        );
2009    }
2010
2011    #[test]
2012    fn blob_url_generation_respects_http_protocol() {
2013        let c = Client::new(ClientConfig {
2014            protocol: ClientProtocol::Http,
2015            ..Default::default()
2016        });
2017        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2018            .expect("Could not parse reference");
2019        assert_eq!(
2020            "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2021            c.to_v2_blob_url(
2022                reference.registry(),
2023                reference.repository(),
2024                reference.digest().unwrap()
2025            )
2026        );
2027    }
2028
2029    #[test]
2030    fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2031        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2032        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2033        let c = Client::new(ClientConfig {
2034            protocol,
2035            ..Default::default()
2036        });
2037        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2038            .expect("Could not parse reference");
2039        assert_eq!(
2040            "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2041            c.to_v2_manifest_url(&reference)
2042        );
2043    }
2044
2045    #[test]
2046    fn manifest_url_generation_uses_http_if_on_exception_list() {
2047        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2048        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2049        let c = Client::new(ClientConfig {
2050            protocol,
2051            ..Default::default()
2052        });
2053        let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2054            .expect("Could not parse reference");
2055        assert_eq!(
2056            "http://oci.registry.local/v2/hello/manifests/v1",
2057            c.to_v2_manifest_url(&reference)
2058        );
2059    }
2060
2061    #[test]
2062    fn blob_url_generation_uses_https_if_not_on_exception_list() {
2063        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2064        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2065        let c = Client::new(ClientConfig {
2066            protocol,
2067            ..Default::default()
2068        });
2069        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2070            .expect("Could not parse reference");
2071        assert_eq!(
2072            "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2073            c.to_v2_blob_url(
2074                reference.registry(),
2075                reference.repository(),
2076                reference.digest().unwrap()
2077            )
2078        );
2079    }
2080
2081    #[test]
2082    fn blob_url_generation_uses_http_if_on_exception_list() {
2083        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2084        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2085        let c = Client::new(ClientConfig {
2086            protocol,
2087            ..Default::default()
2088        });
2089        let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2090            .expect("Could not parse reference");
2091        assert_eq!(
2092            "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2093            c.to_v2_blob_url(
2094                reference.registry(),
2095                reference.repository(),
2096                reference.digest().unwrap()
2097            )
2098        );
2099    }
2100
2101    #[test]
2102    fn can_generate_valid_digest() {
2103        let bytes = b"hellobytes";
2104        let hash = sha256_digest(bytes);
2105
2106        let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2107        let combination_hash =
2108            sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2109
2110        assert_eq!(
2111            hash,
2112            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2113        );
2114        assert_eq!(
2115            combination_hash,
2116            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2117        );
2118    }
2119
2120    #[test]
2121    fn test_registry_token_deserialize() {
2122        // 'token' field, standalone
2123        let text = r#"{"token": "abc"}"#;
2124        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2125        assert!(res.is_ok());
2126        let rt = res.unwrap();
2127        assert_eq!(rt.token(), "abc");
2128
2129        // 'access_token' field, standalone
2130        let text = r#"{"access_token": "xyz"}"#;
2131        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2132        assert!(res.is_ok());
2133        let rt = res.unwrap();
2134        assert_eq!(rt.token(), "xyz");
2135
2136        // both 'token' and 'access_token' fields, 'token' field takes precedence
2137        let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2138        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2139        assert!(res.is_ok());
2140        let rt = res.unwrap();
2141        assert_eq!(rt.token(), "abc");
2142
2143        // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order)
2144        let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2145        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2146        assert!(res.is_ok());
2147        let rt = res.unwrap();
2148        assert_eq!(rt.token(), "abc");
2149
2150        // non-string fields do not break parsing
2151        let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2152        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2153        assert!(res.is_ok());
2154
2155        // Note: tokens should always be strings. The next two tests ensure that if one field
2156        // is invalid (integer), then parse can still succeed if the other field is a string.
2157        //
2158        // numeric 'access_token' field, but string 'token' field does not in parse error
2159        let text = r#"{"access_token": 300, "token": "abc"}"#;
2160        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2161        assert!(res.is_ok());
2162        let rt = res.unwrap();
2163        assert_eq!(rt.token(), "abc");
2164
2165        // numeric 'token' field, but string 'accesss_token' field does not in parse error
2166        let text = r#"{"access_token": "xyz", "token": 300}"#;
2167        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2168        assert!(res.is_ok());
2169        let rt = res.unwrap();
2170        assert_eq!(rt.token(), "xyz");
2171
2172        // numeric 'token' field results in parse error
2173        let text = r#"{"token": 300}"#;
2174        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2175        assert!(res.is_err());
2176
2177        // numeric 'access_token' field results in parse error
2178        let text = r#"{"access_token": 300}"#;
2179        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2180        assert!(res.is_err());
2181
2182        // object 'token' field results in parse error
2183        let text = r#"{"token": {"some": "thing"}}"#;
2184        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2185        assert!(res.is_err());
2186
2187        // object 'access_token' field results in parse error
2188        let text = r#"{"access_token": {"some": "thing"}}"#;
2189        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2190        assert!(res.is_err());
2191
2192        // missing fields results in parse error
2193        let text = r#"{"some": "thing"}"#;
2194        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2195        assert!(res.is_err());
2196
2197        // bad JSON results in parse error
2198        let text = r#"{"token": "abc""#;
2199        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2200        assert!(res.is_err());
2201
2202        // worse JSON results in parse error
2203        let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2204        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2205        assert!(res.is_err());
2206    }
2207
2208    fn check_auth_token(token: &str) {
2209        // We test that the token is longer than a minimal hash.
2210        assert!(token.len() > 64);
2211    }
2212
2213    #[tokio::test]
2214    async fn test_auth() {
2215        for &image in TEST_IMAGES {
2216            let reference = Reference::try_from(image).expect("failed to parse reference");
2217            let c = Client::default();
2218            let token = c
2219                .auth(
2220                    &reference,
2221                    &RegistryAuth::Anonymous,
2222                    RegistryOperation::Pull,
2223                )
2224                .await
2225                .expect("result from auth request");
2226
2227            assert!(token.is_some());
2228            check_auth_token(token.unwrap().as_ref());
2229
2230            let tok = c
2231                .tokens
2232                .get(&reference, RegistryOperation::Pull)
2233                .await
2234                .expect("token is available");
2235            // We test that the token is longer than a minimal hash.
2236            if let RegistryTokenType::Bearer(tok) = tok {
2237                check_auth_token(tok.token());
2238            } else {
2239                panic!("Unexpeted Basic Auth Token");
2240            }
2241        }
2242    }
2243
2244    #[cfg(feature = "test-registry")]
2245    #[tokio::test]
2246    async fn test_list_tags() {
2247        let docker = clients::Cli::default();
2248        let test_container = docker.run(registry_image_edge());
2249        let port = test_container.get_host_port_ipv4(5000);
2250        let auth =
2251            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2252
2253        let client = Client::new(ClientConfig {
2254            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2255            ..Default::default()
2256        });
2257
2258        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2259        client
2260            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2261            .await
2262            .expect("cannot authenticate against registry for pull operation");
2263
2264        let (manifest, _digest) = client
2265            ._pull_image_manifest(&image)
2266            .await
2267            .expect("failed to pull manifest");
2268
2269        let image_data = client
2270            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2271            .await
2272            .expect("failed to pull image");
2273
2274        for i in 0..=3 {
2275            let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
2276                .parse()
2277                .unwrap();
2278            client
2279                .auth(&push_image, &auth, RegistryOperation::Push)
2280                .await
2281                .expect("authenticated");
2282            client
2283                .push(
2284                    &push_image,
2285                    &image_data.layers,
2286                    image_data.config.clone(),
2287                    &auth,
2288                    Some(manifest.clone()),
2289                )
2290                .await
2291                .expect("Failed to push Image");
2292        }
2293
2294        let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
2295            .parse()
2296            .unwrap();
2297        let response = client
2298            .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2299            .await
2300            .expect("Cannot list Tags");
2301        assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2302    }
2303
2304    #[tokio::test]
2305    async fn test_pull_manifest_private() {
2306        for &image in TEST_IMAGES {
2307            let reference = Reference::try_from(image).expect("failed to parse reference");
2308            // Currently, pull_manifest does not perform Authz, so this will fail.
2309            let c = Client::default();
2310            c._pull_image_manifest(&reference)
2311                .await
2312                .expect_err("pull manifest should fail");
2313
2314            // But this should pass
2315            let c = Client::default();
2316            c.auth(
2317                &reference,
2318                &RegistryAuth::Anonymous,
2319                RegistryOperation::Pull,
2320            )
2321            .await
2322            .expect("authenticated");
2323            let (manifest, _) = c
2324                ._pull_image_manifest(&reference)
2325                .await
2326                .expect("pull manifest should not fail");
2327
2328            // The test on the manifest checks all fields. This is just a brief sanity check.
2329            assert_eq!(manifest.schema_version, 2);
2330            assert!(!manifest.layers.is_empty());
2331        }
2332    }
2333
2334    #[tokio::test]
2335    async fn test_pull_manifest_public() {
2336        for &image in TEST_IMAGES {
2337            let reference = Reference::try_from(image).expect("failed to parse reference");
2338            let c = Client::default();
2339            let (manifest, _) = c
2340                .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2341                .await
2342                .expect("pull manifest should not fail");
2343
2344            // The test on the manifest checks all fields. This is just a brief sanity check.
2345            assert_eq!(manifest.schema_version, 2);
2346            assert!(!manifest.layers.is_empty());
2347        }
2348    }
2349
2350    #[tokio::test]
2351    async fn pull_manifest_and_config_public() {
2352        for &image in TEST_IMAGES {
2353            let reference = Reference::try_from(image).expect("failed to parse reference");
2354            let c = Client::default();
2355            let (manifest, _, config) = c
2356                .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2357                .await
2358                .expect("pull manifest and config should not fail");
2359
2360            // The test on the manifest checks all fields. This is just a brief sanity check.
2361            assert_eq!(manifest.schema_version, 2);
2362            assert!(!manifest.layers.is_empty());
2363            assert!(!config.is_empty());
2364        }
2365    }
2366
2367    #[tokio::test]
2368    async fn test_fetch_digest() {
2369        let c = Client::default();
2370
2371        for &image in TEST_IMAGES {
2372            let reference = Reference::try_from(image).expect("failed to parse reference");
2373            c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2374                .await
2375                .expect("pull manifest should not fail");
2376
2377            // This should pass
2378            let reference = Reference::try_from(image).expect("failed to parse reference");
2379            let c = Client::default();
2380            c.auth(
2381                &reference,
2382                &RegistryAuth::Anonymous,
2383                RegistryOperation::Pull,
2384            )
2385            .await
2386            .expect("authenticated");
2387            let digest = c
2388                .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2389                .await
2390                .expect("pull manifest should not fail");
2391
2392            assert_eq!(
2393                digest,
2394                "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2395            );
2396        }
2397    }
2398
2399    #[tokio::test]
2400    async fn test_pull_blob() {
2401        let c = Client::default();
2402
2403        for &image in TEST_IMAGES {
2404            let reference = Reference::try_from(image).expect("failed to parse reference");
2405            c.auth(
2406                &reference,
2407                &RegistryAuth::Anonymous,
2408                RegistryOperation::Pull,
2409            )
2410            .await
2411            .expect("authenticated");
2412            let (manifest, _) = c
2413                ._pull_image_manifest(&reference)
2414                .await
2415                .expect("failed to pull manifest");
2416
2417            // Pull one specific layer
2418            let mut file: Vec<u8> = Vec::new();
2419            let layer0 = &manifest.layers[0];
2420
2421            // This call likes to flake, so we try it at least 5 times
2422            let mut last_error = None;
2423            for i in 1..6 {
2424                if let Err(e) = c.pull_blob(&reference, &layer0, &mut file).await {
2425                    println!(
2426                        "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
2427                        i, e
2428                    );
2429                    last_error.replace(e);
2430                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2431                } else {
2432                    last_error = None;
2433                    break;
2434                }
2435            }
2436
2437            if let Some(e) = last_error {
2438                panic!("Unable to pull layer: {:?}", e);
2439            }
2440
2441            // The manifest says how many bytes we should expect.
2442            assert_eq!(file.len(), layer0.size as usize);
2443        }
2444    }
2445
2446    #[tokio::test]
2447    async fn test_pull_blob_stream() {
2448        let c = Client::default();
2449
2450        for &image in TEST_IMAGES {
2451            let reference = Reference::try_from(image).expect("failed to parse reference");
2452            c.auth(
2453                &reference,
2454                &RegistryAuth::Anonymous,
2455                RegistryOperation::Pull,
2456            )
2457            .await
2458            .expect("authenticated");
2459            let (manifest, _) = c
2460                ._pull_image_manifest(&reference)
2461                .await
2462                .expect("failed to pull manifest");
2463
2464            // Pull one specific layer
2465            let mut file: Vec<u8> = Vec::new();
2466            let layer0 = &manifest.layers[0];
2467
2468            let layer_stream = c
2469                .pull_blob_stream(&reference, &layer0)
2470                .await
2471                .expect("failed to pull blob stream");
2472
2473            AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream), &mut file)
2474                .await
2475                .unwrap();
2476
2477            // The manifest says how many bytes we should expect.
2478            assert_eq!(file.len(), layer0.size as usize);
2479        }
2480    }
2481
2482    #[tokio::test]
2483    async fn test_pull() {
2484        for &image in TEST_IMAGES {
2485            let reference = Reference::try_from(image).expect("failed to parse reference");
2486
2487            // This call likes to flake, so we try it at least 5 times
2488            let mut last_error = None;
2489            let mut image_data = None;
2490            for i in 1..6 {
2491                match Client::default()
2492                    .pull(
2493                        &reference,
2494                        &RegistryAuth::Anonymous,
2495                        vec![manifest::WASM_LAYER_MEDIA_TYPE],
2496                    )
2497                    .await
2498                {
2499                    Ok(data) => {
2500                        image_data = Some(data);
2501                        last_error = None;
2502                        break;
2503                    }
2504                    Err(e) => {
2505                        println!(
2506                            "Got error on pull call attempt {}. Will retry in 1s: {:?}",
2507                            i, e
2508                        );
2509                        last_error.replace(e);
2510                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2511                    }
2512                }
2513            }
2514
2515            if let Some(e) = last_error {
2516                panic!("Unable to pull layer: {:?}", e);
2517            }
2518
2519            assert!(image_data.is_some());
2520            let image_data = image_data.unwrap();
2521            assert!(!image_data.layers.is_empty());
2522            assert!(image_data.digest.is_some());
2523        }
2524    }
2525
2526    /// Attempting to pull an image without any layer validation should fail.
2527    #[tokio::test]
2528    async fn test_pull_without_layer_validation() {
2529        for &image in TEST_IMAGES {
2530            let reference = Reference::try_from(image).expect("failed to parse reference");
2531            assert!(Client::default()
2532                .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2533                .await
2534                .is_err());
2535        }
2536    }
2537
2538    /// Attempting to pull an image with the wrong list of layer validations should fail.
2539    #[tokio::test]
2540    async fn test_pull_wrong_layer_validation() {
2541        for &image in TEST_IMAGES {
2542            let reference = Reference::try_from(image).expect("failed to parse reference");
2543            assert!(Client::default()
2544                .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
2545                .await
2546                .is_err());
2547        }
2548    }
2549
2550    // This is the latest build of distribution/distribution from the `main` branch
2551    // Until distribution v3 is relased, this is the only way to have this fix
2552    // https://github.com/distribution/distribution/pull/3143
2553    //
2554    // We require this fix only when testing the capability to list tags
2555    #[cfg(feature = "test-registry")]
2556    fn registry_image_edge() -> GenericImage {
2557        GenericImage::new("distribution/distribution", "edge")
2558            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2559    }
2560
2561    #[cfg(feature = "test-registry")]
2562    fn registry_image() -> GenericImage {
2563        GenericImage::new("docker.io/library/registry", "2")
2564            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2565    }
2566
2567    #[cfg(feature = "test-registry")]
2568    fn registry_image_basic_auth(auth_path: &str) -> GenericImage {
2569        GenericImage::new("docker.io/library/registry", "2")
2570            .with_env_var("REGISTRY_AUTH", "htpasswd")
2571            .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
2572            .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
2573            .with_volume(auth_path, "/auth")
2574            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2575    }
2576
2577    #[tokio::test]
2578    #[cfg(feature = "test-registry")]
2579    async fn can_push_chunk() {
2580        let docker = clients::Cli::default();
2581        let test_container = docker.run(registry_image());
2582        let port = test_container.get_host_port_ipv4(5000);
2583
2584        let c = Client::new(ClientConfig {
2585            protocol: ClientProtocol::Http,
2586            ..Default::default()
2587        });
2588        let url = format!("localhost:{}/hello-wasm:v1", port);
2589        let image: Reference = url.parse().unwrap();
2590
2591        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2592            .await
2593            .expect("result from auth request");
2594
2595        let location = c
2596            .begin_push_chunked_session(&image)
2597            .await
2598            .expect("failed to begin push session");
2599
2600        let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
2601
2602        let (next_location, next_byte) = c
2603            .push_chunk(&location, &image, &image_data[0], 0)
2604            .await
2605            .expect("failed to push layer");
2606
2607        // Location should include original URL with at session ID appended
2608        assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
2609        assert_eq!(
2610            next_byte,
2611            "iamawebassemblymodule".to_string().into_bytes().len()
2612        );
2613
2614        let layer_location = c
2615            .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
2616            .await
2617            .expect("failed to end push session");
2618
2619        assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
2620    }
2621
2622    #[tokio::test]
2623    #[cfg(feature = "test-registry")]
2624    async fn can_push_multiple_chunks() {
2625        let docker = clients::Cli::default();
2626        let test_container = docker.run(registry_image());
2627        let port = test_container.get_host_port_ipv4(5000);
2628
2629        let mut c = Client::new(ClientConfig {
2630            protocol: ClientProtocol::Http,
2631            ..Default::default()
2632        });
2633        // set a super small chunk size - done to force multiple pushes
2634        c.push_chunk_size = 3;
2635        let url = format!("localhost:{}/hello-wasm:v1", port);
2636        let image: Reference = url.parse().unwrap();
2637
2638        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2639            .await
2640            .expect("result from auth request");
2641
2642        let image_data: Vec<u8> =
2643            b"i am a big webassembly mode that needs chunked uploads".to_vec();
2644        let image_digest = sha256_digest(&image_data);
2645
2646        let location = c
2647            .push_blob_chunked(&image, &image_data, &image_digest)
2648            .await
2649            .expect("failed to begin push session");
2650
2651        assert_eq!(
2652            location,
2653            format!(
2654                "http://localhost:{}/v2/hello-wasm/blobs/{}",
2655                port, image_digest
2656            )
2657        );
2658    }
2659
2660    #[tokio::test]
2661    #[cfg(feature = "test-registry")]
2662    async fn test_image_roundtrip_anon_auth() {
2663        let docker = clients::Cli::default();
2664        let test_container = docker.run(registry_image());
2665
2666        test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
2667    }
2668
2669    #[tokio::test]
2670    #[cfg(feature = "test-registry")]
2671    async fn test_image_roundtrip_basic_auth() {
2672        let auth_dir = TempDir::new().expect("cannot create tmp directory");
2673        let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
2674        fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
2675
2676        let docker = clients::Cli::default();
2677        let image = registry_image_basic_auth(
2678            auth_dir
2679                .path()
2680                .to_str()
2681                .expect("cannot convert htpasswd_path to string"),
2682        );
2683        let test_container = docker.run(image);
2684
2685        let auth =
2686            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2687
2688        test_image_roundtrip(&auth, &test_container).await;
2689    }
2690
2691    #[cfg(feature = "test-registry")]
2692    async fn test_image_roundtrip(
2693        registry_auth: &RegistryAuth,
2694        test_container: &testcontainers::Container<'_, GenericImage>,
2695    ) {
2696        let _ = tracing_subscriber::fmt::try_init();
2697        let port = test_container.get_host_port_ipv4(5000);
2698
2699        let c = Client::new(ClientConfig {
2700            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2701            ..Default::default()
2702        });
2703
2704        // pulling webassembly.azurecr.io/hello-wasm:v1
2705        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2706        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2707            .await
2708            .expect("cannot authenticate against registry for pull operation");
2709
2710        let (manifest, _digest) = c
2711            ._pull_image_manifest(&image)
2712            .await
2713            .expect("failed to pull manifest");
2714
2715        let image_data = c
2716            .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2717            .await
2718            .expect("failed to pull image");
2719
2720        let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
2721        c.auth(&push_image, registry_auth, RegistryOperation::Push)
2722            .await
2723            .expect("authenticated");
2724
2725        c.push(
2726            &push_image,
2727            &image_data.layers,
2728            image_data.config.clone(),
2729            registry_auth,
2730            Some(manifest.clone()),
2731        )
2732        .await
2733        .expect("failed to push image");
2734
2735        let pulled_image_data = c
2736            .pull(
2737                &push_image,
2738                registry_auth,
2739                vec![manifest::WASM_LAYER_MEDIA_TYPE],
2740            )
2741            .await
2742            .expect("failed to pull pushed image");
2743
2744        let (pulled_manifest, _digest) = c
2745            ._pull_image_manifest(&push_image)
2746            .await
2747            .expect("failed to pull pushed image manifest");
2748
2749        assert!(image_data.layers.len() == 1);
2750        assert!(pulled_image_data.layers.len() == 1);
2751        assert_eq!(
2752            image_data.layers[0].data.len(),
2753            pulled_image_data.layers[0].data.len()
2754        );
2755        assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
2756
2757        assert_eq!(manifest.media_type, pulled_manifest.media_type);
2758        assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
2759        assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
2760    }
2761
2762    #[tokio::test]
2763    async fn test_raw_manifest_digest() {
2764        let _ = tracing_subscriber::fmt::try_init();
2765
2766        let c = Client::default();
2767
2768        // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7
2769        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2770        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2771            .await
2772            .expect("cannot authenticate against registry for pull operation");
2773
2774        let (manifest, _) = c
2775            .pull_manifest_raw(
2776                &image,
2777                &RegistryAuth::Anonymous,
2778                MIME_TYPES_DISTRIBUTION_MANIFEST,
2779            )
2780            .await
2781            .expect("failed to pull manifest");
2782
2783        // Compute the digest of the returned manifest text.
2784        let digest = sha2::Sha256::digest(manifest);
2785        let hex = format!("sha256:{:x}", digest);
2786
2787        // Validate that the computed digest and the digest in the pulled reference match.
2788        assert_eq!(image.digest().unwrap(), hex);
2789    }
2790
2791    #[tokio::test]
2792    #[cfg(feature = "test-registry")]
2793    async fn test_mount() {
2794        // initialize the registry
2795        let docker = clients::Cli::default();
2796        let test_container = docker.run(registry_image());
2797        let port = test_container.get_host_port_ipv4(5000);
2798
2799        let c = Client::new(ClientConfig {
2800            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2801            ..Default::default()
2802        });
2803
2804        // Create a dummy layer and push it to `layer-repository`
2805        let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
2806            .parse()
2807            .unwrap();
2808        let layer_data = vec![1u8, 2, 3, 4];
2809        let layer = OciDescriptor {
2810            digest: sha256_digest(&layer_data),
2811            ..Default::default()
2812        };
2813        c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
2814            .await
2815            .expect("Failed to push");
2816
2817        // Mount the layer at `image-repository`
2818        let image_reference: Reference = format!("localhost:{}/image-repository", port)
2819            .parse()
2820            .unwrap();
2821        c.mount_blob(&image_reference, &layer_reference, &layer.digest)
2822            .await
2823            .expect("Failed to mount");
2824
2825        // Pull the layer from `image-repository`
2826        let mut buf = Vec::new();
2827        c.pull_blob(&image_reference, &layer, &mut buf)
2828            .await
2829            .expect("Failed to pull");
2830
2831        assert_eq!(layer_data, buf);
2832    }
2833
2834    #[tokio::test]
2835    async fn test_platform_resolution() {
2836        // test that we get an error when we pull a manifest list
2837        let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
2838        let mut c = Client::new(ClientConfig {
2839            platform_resolver: None,
2840            ..Default::default()
2841        });
2842        let err = c
2843            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2844            .await
2845            .unwrap_err();
2846        assert_eq!(
2847            format!("{}", err),
2848            "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
2849        );
2850
2851        c = Client::new(ClientConfig {
2852            platform_resolver: Some(Box::new(linux_amd64_resolver)),
2853            ..Default::default()
2854        });
2855        let (_manifest, digest) = c
2856            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2857            .await
2858            .expect("Couldn't pull manifest");
2859        assert_eq!(
2860            digest,
2861            "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
2862        );
2863    }
2864
2865    #[tokio::test]
2866    async fn test_pull_ghcr_io() {
2867        let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
2868        let c = Client::default();
2869        let (manifest, _manifest_str) = c
2870            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2871            .await
2872            .unwrap();
2873        assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
2874    }
2875
2876    #[tokio::test]
2877    #[ignore]
2878    async fn test_roundtrip_multiple_layers() {
2879        let _ = tracing_subscriber::fmt::try_init();
2880        let c = Client::new(ClientConfig {
2881            protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
2882            ..Default::default()
2883        });
2884        let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
2885        let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
2886            .expect("failed to parse reference");
2887
2888        let image = c
2889            .pull(
2890                &src_image,
2891                &RegistryAuth::Anonymous,
2892                vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
2893            )
2894            .await
2895            .expect("Failed to pull manifest");
2896        assert!(image.layers.len() > 1);
2897
2898        let ImageData {
2899            layers,
2900            config,
2901            manifest,
2902            ..
2903        } = image;
2904        c.push(
2905            &dest_image,
2906            &layers,
2907            config,
2908            &RegistryAuth::Anonymous,
2909            manifest,
2910        )
2911        .await
2912        .expect("Failed to pull manifest");
2913
2914        c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
2915            .await
2916            .expect("Failed to pull manifest");
2917    }
2918}