Skip to main content

oci_client/
client.rs

1//! OCI distribution client for fetching oci images from an OCI compliant remote store
2use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use futures_util::{future, Stream};
11use http::header::RANGE;
12use http::{HeaderValue, StatusCode};
13use http_auth::{parser::ChallengeParser, ChallengeRef};
14use oci_spec::image::{Arch, Os};
15use olpc_cjson::CanonicalFormatter;
16use reqwest::header::HeaderMap;
17use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::io::{AsyncWrite, AsyncWriteExt};
21use tokio::sync::RwLock;
22use tracing::{debug, trace, warn};
23
24pub use crate::blob::*;
25use crate::config::ConfigFile;
26use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
27use crate::errors::*;
28use crate::manifest::{
29    ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
30    IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
31    IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
32    OCI_IMAGE_MEDIA_TYPE,
33};
34use crate::secrets::RegistryAuth;
35use crate::secrets::*;
36use crate::sha256_digest;
37use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
38use crate::Reference;
39
40const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
41    IMAGE_MANIFEST_MEDIA_TYPE,
42    IMAGE_MANIFEST_LIST_MEDIA_TYPE,
43    OCI_IMAGE_MEDIA_TYPE,
44    OCI_IMAGE_INDEX_MEDIA_TYPE,
45];
46
47const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
48
49/// Default value for `ClientConfig::max_concurrent_upload`
50pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
51
52/// Default value for `ClientConfig::max_concurrent_download`
53pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
54
55/// Default value for `ClientConfig:default_token_expiration_secs`
56pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
57
58static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
59
60/// The data for an image or module.
61#[derive(Clone)]
62pub struct ImageData {
63    /// The layers of the image or module.
64    pub layers: Vec<ImageLayer>,
65    /// The digest of the image or module.
66    pub digest: Option<String>,
67    /// The Configuration object of the image or module.
68    pub config: Config,
69    /// The manifest of the image or module.
70    pub manifest: Option<OciImageManifest>,
71}
72
73/// The data returned by an OCI registry after a successful push
74/// operation is completed
75pub struct PushResponse {
76    /// Pullable url for the config
77    pub config_url: String,
78    /// Pullable url for the manifest
79    pub manifest_url: String,
80}
81
82/// The data returned by a successful tags/list Request
83#[derive(Deserialize, Debug)]
84pub struct TagResponse {
85    /// Repository Name
86    pub name: String,
87    /// List of existing Tags
88    pub tags: Vec<String>,
89}
90
91/// The data returned by a successful catalog request.
92#[derive(Deserialize, Debug)]
93pub struct CatalogResponse {
94    /// List of available repositories in the registry.
95    pub repositories: Vec<String>,
96}
97
98/// Layer descriptor required to pull a layer
99pub struct LayerDescriptor<'a> {
100    /// The digest of the layer
101    pub digest: &'a str,
102    /// Optional list of additional URIs to pull the layer from
103    pub urls: &'a Option<Vec<String>>,
104}
105
106/// A trait for converting any type into a [`LayerDescriptor`]
107pub trait AsLayerDescriptor {
108    /// Convert the type to a LayerDescriptor reference
109    fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
110}
111
112impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
113    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
114        (*self).as_layer_descriptor()
115    }
116}
117
118impl AsLayerDescriptor for &str {
119    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
120        LayerDescriptor {
121            digest: self,
122            urls: &None,
123        }
124    }
125}
126
127impl AsLayerDescriptor for &OciDescriptor {
128    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
129        LayerDescriptor {
130            digest: &self.digest,
131            urls: &self.urls,
132        }
133    }
134}
135
136impl AsLayerDescriptor for &LayerDescriptor<'_> {
137    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
138        LayerDescriptor {
139            digest: self.digest,
140            urls: self.urls,
141        }
142    }
143}
144
145/// The data and media type for an image layer
146#[derive(Clone, Debug, Eq, Hash, PartialEq)]
147pub struct ImageLayer {
148    /// The data of this layer
149    pub data: bytes::Bytes,
150    /// The media type of this layer
151    pub media_type: String,
152    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
153    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
154    pub annotations: Option<BTreeMap<String, String>>,
155}
156
157impl ImageLayer {
158    /// Constructs a new ImageLayer struct with provided data and media type
159    pub fn new(
160        data: impl Into<bytes::Bytes>,
161        media_type: String,
162        annotations: Option<BTreeMap<String, String>>,
163    ) -> Self {
164        ImageLayer {
165            data: data.into(),
166            media_type,
167            annotations,
168        }
169    }
170
171    /// Constructs a new ImageLayer struct with provided data and
172    /// media type application/vnd.oci.image.layer.v1.tar
173    pub fn oci_v1(
174        data: impl Into<bytes::Bytes>,
175        annotations: Option<BTreeMap<String, String>>,
176    ) -> Self {
177        Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
178    }
179    /// Constructs a new ImageLayer struct with provided data and
180    /// media type application/vnd.oci.image.layer.v1.tar+gzip
181    pub fn oci_v1_gzip(
182        data: impl Into<bytes::Bytes>,
183        annotations: Option<BTreeMap<String, String>>,
184    ) -> Self {
185        Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
186    }
187
188    /// Helper function to compute the sha256 digest of an image layer
189    pub fn sha256_digest(&self) -> String {
190        sha256_digest(&self.data)
191    }
192}
193
194/// The data and media type for a configuration object
195#[derive(Clone)]
196pub struct Config {
197    /// The data of this config object
198    pub data: bytes::Bytes,
199    /// The media type of this object
200    pub media_type: String,
201    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
202    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
203    pub annotations: Option<BTreeMap<String, String>>,
204}
205
206impl Config {
207    /// Constructs a new Config struct with provided data and media type
208    pub fn new(
209        data: impl Into<bytes::Bytes>,
210        media_type: String,
211        annotations: Option<BTreeMap<String, String>>,
212    ) -> Self {
213        Config {
214            data: data.into(),
215            media_type,
216            annotations,
217        }
218    }
219
220    /// Constructs a new Config struct with provided data and
221    /// media type application/vnd.oci.image.config.v1+json
222    pub fn oci_v1(
223        data: impl Into<bytes::Bytes>,
224        annotations: Option<BTreeMap<String, String>>,
225    ) -> Self {
226        Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
227    }
228
229    /// Construct a new Config struct with provided [`ConfigFile`] and
230    /// media type `application/vnd.oci.image.config.v1+json`
231    pub fn oci_v1_from_config_file(
232        config_file: ConfigFile,
233        annotations: Option<BTreeMap<String, String>>,
234    ) -> Result<Self> {
235        let data = serde_json::to_vec(&config_file)?;
236        Ok(Self::new(
237            data,
238            IMAGE_CONFIG_MEDIA_TYPE.to_string(),
239            annotations,
240        ))
241    }
242
243    /// Helper function to compute the sha256 digest of this config object
244    pub fn sha256_digest(&self) -> String {
245        sha256_digest(&self.data)
246    }
247}
248
249impl TryFrom<Config> for ConfigFile {
250    type Error = crate::errors::OciDistributionError;
251
252    fn try_from(config: Config) -> Result<Self> {
253        let config = String::from_utf8(config.data.into())
254            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
255        let config_file: ConfigFile = serde_json::from_str(&config)
256            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
257        Ok(config_file)
258    }
259}
260
261/// The OCI client connects to an OCI registry and fetches OCI images.
262///
263/// An OCI registry is a container registry that adheres to the OCI Distribution
264/// specification. DockerHub is one example, as are ACR and GCR. This client
265/// provides a native Rust implementation for pulling OCI images.
266///
267/// Some OCI registries support completely anonymous access. But most require
268/// at least an Oauth2 handshake. Typically, you will want to create a new
269/// client, and then run the `auth()` method, which will attempt to get
270/// a read-only bearer token. From there, pulling images can be done with
271/// the `pull_*` functions.
272///
273/// For true anonymous access, you can skip `auth()`. This is not recommended
274/// unless you are sure that the remote registry does not require Oauth2.
275#[derive(Clone)]
276pub struct Client {
277    config: Arc<ClientConfig>,
278    // Registry -> RegistryAuth
279    auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
280    tokens: TokenCache,
281    client: reqwest::Client,
282    push_chunk_size: usize,
283}
284
285impl Default for Client {
286    fn default() -> Self {
287        Self {
288            config: Arc::default(),
289            auth_store: Arc::default(),
290            tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
291            client: reqwest::Client::default(),
292            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
293        }
294    }
295}
296
297/// A source that can provide a `ClientConfig`.
298/// If you are using this crate in your own application, you can implement this
299/// trait on your configuration type so that it can be passed to `Client::from_source`.
300pub trait ClientConfigSource {
301    /// Provides a `ClientConfig`.
302    fn client_config(&self) -> ClientConfig;
303}
304
305impl TryFrom<ClientConfig> for Client {
306    type Error = OciDistributionError;
307
308    fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
309        #[allow(unused_mut)]
310        let mut client_builder = reqwest::Client::builder();
311        #[cfg(not(target_arch = "wasm32"))]
312        let mut client_builder =
313            client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
314
315        client_builder = match () {
316            #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
317            () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
318            #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
319            () => client_builder,
320        };
321
322        #[cfg(not(target_arch = "wasm32"))]
323        {
324            if !config.tls_certs_only.is_empty() {
325                client_builder =
326                    client_builder.tls_certs_only(convert_certificates(&config.tls_certs_only)?);
327            }
328            client_builder = client_builder
329                .tls_certs_merge(convert_certificates(&config.extra_root_certificates)?);
330        }
331
332        if let Some(timeout) = config.read_timeout {
333            client_builder = client_builder.read_timeout(timeout);
334        }
335        if let Some(timeout) = config.connect_timeout {
336            client_builder = client_builder.connect_timeout(timeout);
337        }
338
339        client_builder = client_builder.user_agent(config.user_agent);
340
341        if let Some(proxy_addr) = &config.https_proxy {
342            let no_proxy = config
343                .no_proxy
344                .as_ref()
345                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
346            let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
347            client_builder = client_builder.proxy(proxy);
348        }
349
350        if let Some(proxy_addr) = &config.http_proxy {
351            let no_proxy = config
352                .no_proxy
353                .as_ref()
354                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
355            let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
356            client_builder = client_builder.proxy(proxy);
357        }
358
359        let default_token_expiration_secs = config.default_token_expiration_secs;
360        Ok(Self {
361            config: Arc::new(config),
362            tokens: TokenCache::new(default_token_expiration_secs),
363            client: client_builder.build()?,
364            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
365            ..Default::default()
366        })
367    }
368}
369
370impl Client {
371    /// Create a new client with the supplied config
372    pub fn new(config: ClientConfig) -> Self {
373        let default_token_expiration_secs = config.default_token_expiration_secs;
374        Client::try_from(config).unwrap_or_else(|err| {
375            warn!("Cannot create OCI client from config: {:?}", err);
376            warn!("Creating client with default configuration");
377            Self {
378                tokens: TokenCache::new(default_token_expiration_secs),
379                push_chunk_size: PUSH_CHUNK_MAX_SIZE,
380                ..Default::default()
381            }
382        })
383    }
384
385    /// Create a new client with the supplied config
386    pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
387        Self::new(config_source.client_config())
388    }
389
390    async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
391        self.auth_store
392            .write()
393            .await
394            .insert(registry.to_string(), auth);
395    }
396
397    async fn is_stored_auth(&self, registry: &str) -> bool {
398        self.auth_store.read().await.contains_key(registry)
399    }
400
401    /// Store the authentication information for this registry if it's not already stored in the client.
402    ///
403    /// Most of the time, you don't need to call this method directly. It's called by other
404    /// methods (where you have to provide the authentication information as parameter).
405    ///
406    /// But if you want to pull/push a blob without calling any of the other methods first, which would
407    /// store the authentication information, you can call this method to store the authentication
408    /// information manually.
409    pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
410        if !self.is_stored_auth(registry).await {
411            self.store_auth(registry, auth.clone()).await;
412        }
413    }
414
415    /// Checks if we got a token, if we don't - create it and store it in cache.
416    async fn get_auth_token(
417        &self,
418        reference: &Reference,
419        op: RegistryOperation,
420    ) -> Option<RegistryTokenType> {
421        let registry = reference.resolve_registry();
422        let auth = self.auth_store.read().await.get(registry)?.clone();
423        match self.tokens.get(reference, op).await {
424            Some(token) => Some(token),
425            None => {
426                let token = self._auth(reference, &auth, op).await.ok()??;
427                self.tokens.insert(reference, op, token.clone()).await;
428                Some(token)
429            }
430        }
431    }
432
433    /// Fetches the available Tags for the given Reference
434    ///
435    /// The client will check if it's already been authenticated and if
436    /// not will attempt to do.
437    pub async fn list_tags(
438        &self,
439        image: &Reference,
440        auth: &RegistryAuth,
441        n: Option<usize>,
442        last: Option<&str>,
443    ) -> Result<TagResponse> {
444        let op = RegistryOperation::Pull;
445        let url = self.to_list_tags_url(image);
446
447        self.store_auth_if_needed(image.resolve_registry(), auth)
448            .await;
449
450        let request = self.client.get(&url);
451        let request = if let Some(num) = n {
452            request.query(&[("n", num)])
453        } else {
454            request
455        };
456        let request = if let Some(l) = last {
457            request.query(&[("last", l)])
458        } else {
459            request
460        };
461        let request = RequestBuilderWrapper {
462            client: self,
463            request_builder: request,
464        };
465        let res = request
466            .apply_auth(image, op)
467            .await?
468            .into_request_builder()
469            .send()
470            .await?;
471        let status = res.status();
472        let body = res.bytes().await?;
473
474        validate_registry_response(status, &body, &url)?;
475
476        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
477    }
478
479    /// Pull an image and return the bytes
480    ///
481    /// The client will check if it's already been authenticated and if
482    /// not will attempt to do.
483    pub async fn pull(
484        &self,
485        image: &Reference,
486        auth: &RegistryAuth,
487        accepted_media_types: Vec<&str>,
488    ) -> Result<ImageData> {
489        debug!("Pulling image: {:?}", image);
490        self.store_auth_if_needed(image.resolve_registry(), auth)
491            .await;
492
493        let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
494
495        self.validate_layers(&manifest, accepted_media_types)
496            .await?;
497
498        let layers = stream::iter(&manifest.layers)
499            .map(|layer| {
500                // This avoids moving `self` which is &Self
501                // into the async block. We only want to capture
502                // as &Self
503                let this = &self;
504                async move {
505                    let mut out: Vec<u8> = Vec::new();
506                    debug!("Pulling image layer");
507                    this.pull_blob(image, layer, &mut out).await?;
508                    Ok::<_, OciDistributionError>(ImageLayer::new(
509                        out,
510                        layer.media_type.clone(),
511                        layer.annotations.clone(),
512                    ))
513                }
514            })
515            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
516            .buffer_unordered(self.config.max_concurrent_download)
517            .try_collect()
518            .await?;
519
520        Ok(ImageData {
521            layers,
522            manifest: Some(manifest),
523            config,
524            digest: Some(digest),
525        })
526    }
527
528    /// Checks if a blob exists in the remote registry
529    pub async fn blob_exists(&self, image: &Reference, digest: &str) -> Result<bool> {
530        let url = self.to_v2_blob_url(image, digest);
531        let request = RequestBuilderWrapper {
532            client: self,
533            request_builder: self.client.head(&url),
534        };
535
536        let res = request
537            .apply_auth(image, RegistryOperation::Pull)
538            .await?
539            .into_request_builder()
540            .send()
541            .await?;
542
543        match res.error_for_status() {
544            Ok(_) => Ok(true),
545            Err(err) => {
546                if err.status() == Some(StatusCode::NOT_FOUND) {
547                    Ok(false)
548                } else {
549                    Err(err.into())
550                }
551            }
552        }
553    }
554
555    /// Push an image and return the uploaded URL of the image
556    ///
557    /// The client will check if it's already been authenticated and if
558    /// not will attempt to do.
559    ///
560    /// If a manifest is not provided, the client will attempt to generate
561    /// it from the provided image and config data.
562    ///
563    /// Returns pullable URL for the image
564    pub async fn push(
565        &self,
566        image_ref: &Reference,
567        layers: &[ImageLayer],
568        config: Config,
569        auth: &RegistryAuth,
570        manifest: Option<OciImageManifest>,
571    ) -> Result<PushResponse> {
572        debug!("Pushing image: {:?}", image_ref);
573        self.store_auth_if_needed(image_ref.resolve_registry(), auth)
574            .await;
575
576        let manifest: OciImageManifest = match manifest {
577            Some(m) => m,
578            None => OciImageManifest::build(layers, &config, None),
579        };
580
581        // Upload layers
582        stream::iter(layers)
583            .map(|layer| {
584                // This avoids moving `self` which is &Self
585                // into the async block. We only want to capture
586                // as &Self
587                let this = &self;
588                async move {
589                    let digest = layer.sha256_digest();
590                    this.push_blob(image_ref, layer.data.clone(), &digest)
591                        .await?;
592                    Result::Ok(())
593                }
594            })
595            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
596            .buffer_unordered(self.config.max_concurrent_upload)
597            .try_for_each(future::ok)
598            .await?;
599
600        let config_url = self
601            .push_blob(image_ref, config.data, &manifest.config.digest)
602            .await?;
603        let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
604
605        Ok(PushResponse {
606            config_url,
607            manifest_url,
608        })
609    }
610
611    /// Pushes a blob to the registry
612    pub async fn push_blob(
613        &self,
614        image_ref: &Reference,
615        data: impl Into<bytes::Bytes>,
616        digest: &str,
617    ) -> Result<String> {
618        if self.config.use_monolithic_push {
619            return self.push_blob_monolithically(image_ref, data, digest).await;
620        }
621        let data = data.into();
622        // Cloning the bytes here is cheap (e.g. doesn't allocate anything except some space for
623        // some pointers). If any cloning happened, it is because the caller's passed data was not
624        // already a `Bytes` type or static data.
625        match self
626            .push_blob_chunked(image_ref, data.clone(), digest)
627            .await
628        {
629            Ok(url) => Ok(url),
630            Err(OciDistributionError::SpecViolationError(violation)) => {
631                warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
632                warn!("Attempting monolithic push");
633                self.push_blob_monolithically(image_ref, data, digest).await
634            }
635            Err(e) => Err(e),
636        }
637    }
638
639    /// Pushes a blob to the registry as a monolith
640    ///
641    /// Returns the pullable location of the blob
642    async fn push_blob_monolithically(
643        &self,
644        image: &Reference,
645        blob_data: impl Into<bytes::Bytes>,
646        blob_digest: &str,
647    ) -> Result<String> {
648        let location = self.begin_push_monolithical_session(image).await?;
649        self.push_monolithically(&location, image, blob_data, blob_digest)
650            .await
651    }
652
653    /// Pushes a blob to the registry as a series of chunks
654    ///
655    /// Returns the pullable location of the blob
656    async fn push_blob_chunked(
657        &self,
658        image: &Reference,
659        blob_data: impl Into<bytes::Bytes>,
660        blob_digest: &str,
661    ) -> Result<String> {
662        let mut location = self.begin_push_chunked_session(image).await?;
663        let mut start: usize = 0;
664
665        let mut blob_data: bytes::Bytes = blob_data.into();
666        while !blob_data.is_empty() {
667            let chunk_size = self.push_chunk_size.min(blob_data.len());
668            let chunk = blob_data.split_to(chunk_size);
669            (location, start) = self.push_chunk(&location, image, chunk, start).await?;
670        }
671        self.end_push_chunked_session(&location, image, blob_digest)
672            .await
673    }
674
675    /// Pushes a blob to the registry as a series of chunks from an input stream
676    ///
677    /// Returns the pullable location of the blob
678    pub async fn push_blob_stream<T: Stream<Item = Result<bytes::Bytes>>>(
679        &self,
680        image: &Reference,
681        blob_data_stream: T,
682        blob_digest: &str,
683    ) -> Result<String> {
684        let mut location = self.begin_push_chunked_session(image).await?;
685        let mut range_start = 0;
686
687        let mut blob_data_stream = pin!(blob_data_stream);
688
689        while let Some(blob_data) = blob_data_stream.next().await {
690            let mut blob_data = blob_data?;
691            while !blob_data.is_empty() {
692                let chunk = blob_data.split_to(self.push_chunk_size.min(blob_data.len()));
693                (location, range_start) = self
694                    .push_chunk(&location, image, chunk, range_start)
695                    .await?;
696            }
697        }
698        self.end_push_chunked_session(&location, image, blob_digest)
699            .await
700    }
701
702    /// Perform an OAuth v2 auth request if necessary.
703    ///
704    /// This performs authorization and then stores the token internally to be used
705    /// on other requests.
706    pub async fn auth(
707        &self,
708        image: &Reference,
709        authentication: &RegistryAuth,
710        operation: RegistryOperation,
711    ) -> Result<Option<String>> {
712        self.store_auth_if_needed(image.resolve_registry(), authentication)
713            .await;
714        // preserve old caching behavior
715        match self._auth(image, authentication, operation).await {
716            Ok(Some(RegistryTokenType::Bearer(token))) => {
717                self.tokens
718                    .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
719                    .await;
720                Ok(Some(token.token().to_string()))
721            }
722            Ok(Some(RegistryTokenType::Basic(username, password))) => {
723                self.tokens
724                    .insert(
725                        image,
726                        operation,
727                        RegistryTokenType::Basic(username, password),
728                    )
729                    .await;
730                Ok(None)
731            }
732            Ok(None) => Ok(None),
733            Err(e) => Err(e),
734        }
735    }
736
737    /// Internal auth that retrieves token.
738    async fn _auth(
739        &self,
740        image: &Reference,
741        authentication: &RegistryAuth,
742        operation: RegistryOperation,
743    ) -> Result<Option<RegistryTokenType>> {
744        debug!("Authorizing for image: {:?}", image);
745        // The version request will tell us where to go.
746        let url = format!(
747            "{}://{}/v2/",
748            self.config.protocol.scheme_for(image.resolve_registry()),
749            image.resolve_registry()
750        );
751        debug!(?url);
752
753        if let RegistryAuth::Bearer(token) = authentication {
754            return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
755                token: token.clone(),
756            })));
757        }
758
759        let res = self.client.get(&url).send().await?;
760        let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
761            Some(h) => h,
762            None => return Ok(None),
763        };
764
765        let challenge = match BearerChallenge::try_from(dist_hdr) {
766            Ok(c) => c,
767            Err(e) => {
768                debug!(error = ?e, "Falling back to HTTP Basic Auth");
769                if let RegistryAuth::Basic(username, password) = authentication {
770                    return Ok(Some(RegistryTokenType::Basic(
771                        username.to_string(),
772                        password.to_string(),
773                    )));
774                }
775                return Ok(None);
776            }
777        };
778
779        // Allow for either push or pull authentication
780        let scope = match operation {
781            RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
782            RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
783        };
784
785        let realm = challenge.realm.as_ref();
786        let service = challenge.service.as_ref();
787        let mut query = vec![("scope", &scope)];
788
789        if let Some(s) = service {
790            query.push(("service", s))
791        }
792
793        // TODO: At some point in the future, we should support sending a secret to the
794        // server for auth. This particular workflow is for read-only public auth.
795        debug!(?realm, ?service, ?scope, "Making authentication call");
796
797        let auth_res = self
798            .client
799            .get(realm)
800            .query(&query)
801            .apply_authentication(authentication)
802            .send()
803            .await?;
804
805        match auth_res.status() {
806            reqwest::StatusCode::OK => {
807                let text = auth_res.text().await?;
808                debug!("Received response from auth request: {}", text);
809                let token: RegistryToken = serde_json::from_str(&text)
810                    .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
811                debug!("Successfully authorized for image '{:?}'", image);
812                Ok(Some(RegistryTokenType::Bearer(token)))
813            }
814            _ => {
815                let reason = auth_res.text().await?;
816                debug!("Failed to authenticate for image '{:?}': {}", image, reason);
817                Err(OciDistributionError::AuthenticationFailure(reason))
818            }
819        }
820    }
821
822    /// Fetch a manifest's digest from the remote OCI Distribution service.
823    ///
824    /// If the connection has already gone through authentication, this will
825    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
826    ///
827    /// Will first attempt to read the `Docker-Content-Digest` header using a
828    /// HEAD request. If this header is not present, will make a second GET
829    /// request and return the SHA256 of the response body.
830    pub async fn fetch_manifest_digest(
831        &self,
832        image: &Reference,
833        auth: &RegistryAuth,
834    ) -> Result<String> {
835        self.store_auth_if_needed(image.resolve_registry(), auth)
836            .await;
837
838        let url = self.to_v2_manifest_url(image);
839        debug!("HEAD image manifest from {}", url);
840        let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
841            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
842            .apply_auth(image, RegistryOperation::Pull)
843            .await?
844            .into_request_builder()
845            .send()
846            .await?;
847
848        if let Some(digest) = digest_header_value(res.headers().clone())? {
849            let status = res.status();
850            let body = res.bytes().await?;
851            validate_registry_response(status, &body, &url)?;
852
853            // If the reference has a digest and the digest header has a matching algorithm, compare
854            // them and return an error if they don't match.
855            if let Some(img_digest) = image.digest() {
856                let header_digest = Digest::new(&digest)?;
857                let image_digest = Digest::new(img_digest)?;
858                if header_digest.algorithm == image_digest.algorithm
859                    && header_digest != image_digest
860                {
861                    return Err(DigestError::VerificationError {
862                        expected: img_digest.to_string(),
863                        actual: digest,
864                    }
865                    .into());
866                }
867            }
868
869            Ok(digest)
870        } else {
871            debug!("GET image manifest from {}", url);
872            let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
873                .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
874                .apply_auth(image, RegistryOperation::Pull)
875                .await?
876                .into_request_builder()
877                .send()
878                .await?;
879            let status = res.status();
880            trace!(headers = ?res.headers(), "Got Headers");
881            let headers = res.headers().clone();
882            let body = res.bytes().await?;
883            validate_registry_response(status, &body, &url)?;
884
885            validate_digest(&body, digest_header_value(headers)?, image.digest())
886                .map_err(OciDistributionError::from)
887        }
888    }
889
890    async fn validate_layers(
891        &self,
892        manifest: &OciImageManifest,
893        accepted_media_types: Vec<&str>,
894    ) -> Result<()> {
895        if manifest.layers.is_empty() {
896            return Err(OciDistributionError::PullNoLayersError);
897        }
898
899        for layer in &manifest.layers {
900            if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
901                return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
902                    layer.media_type.clone(),
903                ));
904            }
905        }
906
907        Ok(())
908    }
909
910    /// Pull a manifest from the remote OCI Distribution service.
911    ///
912    /// The client will check if it's already been authenticated and if
913    /// not will attempt to do.
914    ///
915    /// A Tuple is returned containing the [OciImageManifest]
916    /// and the manifest content digest hash.
917    ///
918    /// If a multi-platform Image Index manifest is encountered, a platform-specific
919    /// Image manifest will be selected using the client's default platform resolution.
920    pub async fn pull_image_manifest(
921        &self,
922        image: &Reference,
923        auth: &RegistryAuth,
924    ) -> Result<(OciImageManifest, String)> {
925        self.store_auth_if_needed(image.resolve_registry(), auth)
926            .await;
927
928        self._pull_image_manifest(image).await
929    }
930
931    /// Pull a manifest from the remote OCI Distribution service.
932    ///
933    /// The client will check if it's already been authenticated and if
934    /// not will attempt to do.
935    ///
936    /// Returns `(image_manifest, manifest_digest, Option<manifest_list_digest>)`.
937    /// The manifest list digest is `Some` when the original reference pointed to
938    /// an image index / manifest list; `None` when it pointed directly to a
939    /// single-platform image manifest.
940    ///
941    /// If a multi-platform Image Index manifest is encountered, a platform-specific
942    /// Image manifest will be selected using the client's default platform resolution.
943    pub async fn pull_image_manifest_and_list_digest(
944        &self,
945        image: &Reference,
946        auth: &RegistryAuth,
947    ) -> Result<(OciImageManifest, String, Option<String>)> {
948        self.store_auth_if_needed(image.resolve_registry(), auth)
949            .await;
950
951        self._pull_image_manifest_and_list_digest(image).await
952    }
953
954    /// Pull a manifest from the remote OCI Distribution service without parsing it.
955    ///
956    /// The client will check if it's already been authenticated and if
957    /// not will attempt to do.
958    ///
959    /// A Tuple is returned containing raw byte representation of the manifest
960    /// and the manifest content digest.
961    pub async fn pull_manifest_raw(
962        &self,
963        image: &Reference,
964        auth: &RegistryAuth,
965        accepted_media_types: &[&str],
966    ) -> Result<(bytes::Bytes, String)> {
967        self.store_auth_if_needed(image.resolve_registry(), auth)
968            .await;
969
970        self._pull_manifest_raw(image, accepted_media_types).await
971    }
972
973    /// Pull a manifest from the remote OCI Distribution service.
974    ///
975    /// The client will check if it's already been authenticated and if
976    /// not will attempt to do.
977    ///
978    /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest)
979    /// and the manifest content digest hash.
980    pub async fn pull_manifest(
981        &self,
982        image: &Reference,
983        auth: &RegistryAuth,
984    ) -> Result<(OciManifest, String)> {
985        self.store_auth_if_needed(image.resolve_registry(), auth)
986            .await;
987
988        self._pull_manifest(image).await
989    }
990
991    /// Pull an image manifest from the remote OCI Distribution service.
992    ///
993    /// If the connection has already gone through authentication, this will
994    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
995    ///
996    /// If a multi-platform Image Index manifest is encountered, a platform-specific
997    /// Image manifest will be selected using the client's default platform resolution.
998    async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
999        let (manifest, digest, _list_digest) =
1000            self._pull_image_manifest_and_list_digest(image).await?;
1001        Ok((manifest, digest))
1002    }
1003
1004    /// Pull an image manifest from the remote OCI Distribution service,
1005    /// also returning the manifest list digest if the image is multi-arch.
1006    ///
1007    /// If the connection has already gone through authentication, this will
1008    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1009    ///
1010    /// Returns `(image_manifest, manifest_digest, Option<manifest_list_digest>)`.
1011    /// The manifest list digest is `Some` when the original reference pointed to
1012    /// an image index / manifest list; `None` when it pointed directly to a
1013    /// single-platform image manifest.
1014    async fn _pull_image_manifest_and_list_digest(
1015        &self,
1016        image: &Reference,
1017    ) -> Result<(OciImageManifest, String, Option<String>)> {
1018        let (manifest, digest) = self._pull_manifest(image).await?;
1019        match manifest {
1020            OciManifest::Image(image_manifest) => Ok((image_manifest, digest, None)),
1021            OciManifest::ImageIndex(image_index_manifest) => {
1022                let list_digest = digest;
1023                debug!("Inspecting Image Index Manifest");
1024                let platform_digest = if let Some(resolver) = &self.config.platform_resolver {
1025                    resolver(&image_index_manifest.manifests)
1026                } else {
1027                    return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
1028                };
1029
1030                match platform_digest {
1031                    Some(platform_digest) => {
1032                        debug!("Selected manifest entry with digest: {}", platform_digest);
1033                        let manifest_entry_reference =
1034                            image.clone_with_digest(platform_digest.clone());
1035                        self._pull_manifest(&manifest_entry_reference)
1036                            .await
1037                            .and_then(|(manifest, _digest)| match manifest {
1038                                OciManifest::Image(manifest) => {
1039                                    Ok((manifest, platform_digest, Some(list_digest)))
1040                                }
1041                                OciManifest::ImageIndex(_) => {
1042                                    Err(OciDistributionError::ImageManifestNotFoundError(
1043                                        "received Image Index manifest instead".to_string(),
1044                                    ))
1045                                }
1046                            })
1047                    }
1048                    None => Err(OciDistributionError::ImageManifestNotFoundError(
1049                        "no entry found in image index manifest matching client's default platform"
1050                            .to_string(),
1051                    )),
1052                }
1053            }
1054        }
1055    }
1056
1057    /// Pull a manifest from the remote OCI Distribution service without parsing it.
1058    ///
1059    /// If the connection has already gone through authentication, this will
1060    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1061    async fn _pull_manifest_raw(
1062        &self,
1063        image: &Reference,
1064        accepted_media_types: &[&str],
1065    ) -> Result<(bytes::Bytes, String)> {
1066        let url = self.to_v2_manifest_url(image);
1067        debug!("Pulling image manifest from {}", url);
1068
1069        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1070            .apply_accept(accepted_media_types)?
1071            .apply_auth(image, RegistryOperation::Pull)
1072            .await?
1073            .into_request_builder()
1074            .send()
1075            .await?;
1076        let status = res.status();
1077        let headers = res.headers().clone();
1078        let body = res.bytes().await?;
1079
1080        validate_registry_response(status, &body, &url)?;
1081
1082        let digest_header = digest_header_value(headers)?;
1083        let digest = validate_digest(&body, digest_header, image.digest())?;
1084
1085        Ok((body, digest))
1086    }
1087
1088    /// Pull a manifest from the remote OCI Distribution service.
1089    ///
1090    /// If the connection has already gone through authentication, this will
1091    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1092    async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
1093        let (body, digest) = self
1094            ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
1095            .await?;
1096
1097        self.validate_image_manifest(&body).await?;
1098
1099        debug!("Parsing response as Manifest");
1100        let manifest = serde_json::from_slice(&body)
1101            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1102        Ok((manifest, digest))
1103    }
1104
1105    async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
1106        let versioned: Versioned = serde_json::from_slice(body)
1107            .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
1108        debug!(?versioned, "validating manifest");
1109        if versioned.schema_version != 2 {
1110            return Err(OciDistributionError::UnsupportedSchemaVersionError(
1111                versioned.schema_version,
1112            ));
1113        }
1114        if let Some(media_type) = versioned.media_type {
1115            if media_type != IMAGE_MANIFEST_MEDIA_TYPE
1116                && media_type != OCI_IMAGE_MEDIA_TYPE
1117                && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
1118                && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
1119            {
1120                return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
1121            }
1122        }
1123
1124        Ok(())
1125    }
1126
1127    /// Pull a manifest and its config from the remote OCI Distribution service.
1128    ///
1129    /// The client will check if it's already been authenticated and if
1130    /// not will attempt to do.
1131    ///
1132    /// A Tuple is returned containing the [OciImageManifest],
1133    /// the manifest content digest hash and the contents of the manifests config layer
1134    /// as a String.
1135    pub async fn pull_manifest_and_config(
1136        &self,
1137        image: &Reference,
1138        auth: &RegistryAuth,
1139    ) -> Result<(OciImageManifest, String, String)> {
1140        self.store_auth_if_needed(image.resolve_registry(), auth)
1141            .await;
1142
1143        self._pull_manifest_and_config(image)
1144            .await
1145            .and_then(|(manifest, digest, config)| {
1146                Ok((
1147                    manifest,
1148                    digest,
1149                    String::from_utf8(config.data.into()).map_err(|e| {
1150                        OciDistributionError::GenericError(Some(format!(
1151                            "Cannot parse config as UTF-8 string: {e}"
1152                        )))
1153                    })?,
1154                ))
1155            })
1156    }
1157
1158    /// Pull a manifest and its config from the remote OCI Distribution service.
1159    ///
1160    /// The client will check if it's already been authenticated and if
1161    /// not will attempt to do.
1162    ///
1163    /// Returns `(image_manifest, manifest_digest, config_json, Option<manifest_list_digest>)`.
1164    /// The manifest list digest is `Some` when the original reference pointed to
1165    /// an image index / manifest list; `None` when it pointed directly to a
1166    /// single-platform image manifest.
1167    ///
1168    /// If a multi-platform Image Index manifest is encountered, a platform-specific
1169    /// Image manifest will be selected using the client's default platform resolution.
1170    pub async fn pull_manifest_and_config_and_list_digest(
1171        &self,
1172        image: &Reference,
1173        auth: &RegistryAuth,
1174    ) -> Result<(OciImageManifest, String, String, Option<String>)> {
1175        self.store_auth_if_needed(image.resolve_registry(), auth)
1176            .await;
1177
1178        self._pull_manifest_and_config_and_list_digest(image)
1179            .await
1180            .and_then(|(manifest, digest, config, list_digest)| {
1181                Ok((
1182                    manifest,
1183                    digest,
1184                    String::from_utf8(config.data.into()).map_err(|e| {
1185                        OciDistributionError::GenericError(Some(format!(
1186                            "Cannot parse config as UTF-8 string: {e}"
1187                        )))
1188                    })?,
1189                    list_digest,
1190                ))
1191            })
1192    }
1193
1194    async fn _pull_manifest_and_config(
1195        &self,
1196        image: &Reference,
1197    ) -> Result<(OciImageManifest, String, Config)> {
1198        let (manifest, digest, config, _list_digest) = self
1199            ._pull_manifest_and_config_and_list_digest(image)
1200            .await?;
1201        Ok((manifest, digest, config))
1202    }
1203
1204    async fn _pull_manifest_and_config_and_list_digest(
1205        &self,
1206        image: &Reference,
1207    ) -> Result<(OciImageManifest, String, Config, Option<String>)> {
1208        let (manifest, digest, list_digest) =
1209            self._pull_image_manifest_and_list_digest(image).await?;
1210
1211        let mut out: Vec<u8> = Vec::new();
1212        debug!("Pulling config layer");
1213        self.pull_blob(image, &manifest.config, &mut out).await?;
1214        let media_type = manifest.config.media_type.clone();
1215        let annotations = manifest.annotations.clone();
1216        Ok((
1217            manifest,
1218            digest,
1219            Config::new(out, media_type, annotations),
1220            list_digest,
1221        ))
1222    }
1223
1224    /// Push a manifest list to an OCI registry.
1225    ///
1226    /// This pushes a manifest list to an OCI registry.
1227    pub async fn push_manifest_list(
1228        &self,
1229        reference: &Reference,
1230        auth: &RegistryAuth,
1231        manifest: OciImageIndex,
1232    ) -> Result<String> {
1233        self.store_auth_if_needed(reference.resolve_registry(), auth)
1234            .await;
1235        self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1236            .await
1237    }
1238
1239    /// Pull a single layer from an OCI registry.
1240    ///
1241    /// This pulls the layer for a particular image that is identified by the given layer
1242    /// descriptor. The layer descriptor can be anything that can be referenced as a layer
1243    /// descriptor. The image reference is used to find the repository and the registry, but it is
1244    /// not used to verify that the digest is a layer inside of the image. (The manifest is used for
1245    /// that.)
1246    pub async fn pull_blob<T: AsyncWrite>(
1247        &self,
1248        image: &Reference,
1249        layer: impl AsLayerDescriptor,
1250        out: T,
1251    ) -> Result<()> {
1252        let response = self.pull_blob_response(image, &layer, None, None).await?;
1253
1254        let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1255            .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1256            .transpose()?;
1257
1258        // With a blob pull, we need to use the digest from the layer and not the image
1259        let layer_digest = layer.as_layer_descriptor().digest.to_string();
1260        let mut layer_digester = Digester::new(&layer_digest)?;
1261
1262        let mut stream = response.error_for_status()?.bytes_stream();
1263
1264        let mut out = pin!(out);
1265
1266        while let Some(bytes) = stream.next().await {
1267            let bytes = bytes?;
1268            if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1269                digester.update(&bytes);
1270            }
1271            layer_digester.update(&bytes);
1272            out.write_all(&bytes).await?;
1273        }
1274
1275        // Ensure all buffered writes are flushed before returning.
1276        out.flush().await?;
1277
1278        if let Some((mut digester, expected)) = maybe_header_digester.take() {
1279            let digest = digester.finalize();
1280
1281            if digest != expected {
1282                return Err(DigestError::VerificationError {
1283                    expected,
1284                    actual: digest,
1285                }
1286                .into());
1287            }
1288        }
1289
1290        let digest = layer_digester.finalize();
1291        if digest != layer_digest {
1292            return Err(DigestError::VerificationError {
1293                expected: layer_digest,
1294                actual: digest,
1295            }
1296            .into());
1297        }
1298
1299        Ok(())
1300    }
1301
1302    /// Stream a single layer from an OCI registry.
1303    ///
1304    /// This is a streaming version of [`Client::pull_blob`]. Returns [`SizedStream`], which
1305    /// implements [`Stream`] or can be used directly to get the content
1306    /// length of the response
1307    ///
1308    /// # Example
1309    /// ```rust
1310    /// use std::future::Future;
1311    /// use std::io::Error;
1312    ///
1313    /// use futures_util::TryStreamExt;
1314    /// use oci_client::{Client, Reference};
1315    /// use oci_client::client::ClientConfig;
1316    /// use oci_client::manifest::OciDescriptor;
1317    ///
1318    /// async {
1319    ///   let client = Client::new(Default::default());
1320    ///   let imgRef: Reference = "busybox:latest".parse().unwrap();
1321    ///   let desc = OciDescriptor { digest: "sha256:deadbeef".to_owned(), ..Default::default() };
1322    ///   let mut stream = client.pull_blob_stream(&imgRef, &desc).await.unwrap();
1323    ///   // Check the optional content length
1324    ///   let content_length = stream.content_length.unwrap_or_default();
1325    ///   // Use as a stream
1326    ///   stream.try_next().await.unwrap().unwrap();
1327    ///   // Use the underlying stream
1328    ///   let mut stream = stream.stream;
1329    /// };
1330    /// ```
1331    pub async fn pull_blob_stream(
1332        &self,
1333        image: &Reference,
1334        layer: impl AsLayerDescriptor,
1335    ) -> Result<SizedStream> {
1336        stream_from_response(
1337            self.pull_blob_response(image, &layer, None, None).await?,
1338            layer,
1339            true,
1340        )
1341    }
1342
1343    /// Stream a single layer from an OCI registry starting with a byte offset. This can be used to
1344    /// continue downloading a layer after a network error. Please note that when doing a partial
1345    /// download (meaning it returns the [`BlobResponse::Partial`] variant), the layer digest is not
1346    /// verified as all the bytes are not available. The returned blob response will contain the
1347    /// header from the request digest, if it was set, that can be used (in addition to the digest
1348    /// from the layer) to verify the blob once all the bytes have been downloaded. Failure to do
1349    /// this means your content will not be verified.
1350    ///
1351    /// Returns [`BlobResponse`] which indicates if the response was a full or partial response.
1352    pub async fn pull_blob_stream_partial(
1353        &self,
1354        image: &Reference,
1355        layer: impl AsLayerDescriptor,
1356        offset: u64,
1357        length: Option<u64>,
1358    ) -> Result<BlobResponse> {
1359        let response = self
1360            .pull_blob_response(image, &layer, Some(offset), length)
1361            .await?;
1362
1363        let status = response.status();
1364        match status {
1365            StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1366                response, &layer, true,
1367            )?)),
1368            StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1369                response, &layer, false,
1370            )?)),
1371            _ => Err(OciDistributionError::ServerError {
1372                code: status.as_u16(),
1373                url: response.url().to_string(),
1374                message: response.text().await?,
1375            }),
1376        }
1377    }
1378
1379    /// Pull a single layer from an OCI registry.
1380    async fn pull_blob_response(
1381        &self,
1382        image: &Reference,
1383        layer: impl AsLayerDescriptor,
1384        offset: Option<u64>,
1385        length: Option<u64>,
1386    ) -> Result<Response> {
1387        let layer = layer.as_layer_descriptor();
1388        let url = self.to_v2_blob_url(image, layer.digest);
1389
1390        let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1391            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1392            .apply_auth(image, RegistryOperation::Pull)
1393            .await?
1394            .into_request_builder();
1395        if let (Some(off), Some(len)) = (offset, length) {
1396            let end = (off + len).saturating_sub(1);
1397            request = request.header(
1398                RANGE,
1399                HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1400            );
1401        } else if let Some(offset) = offset {
1402            request = request.header(
1403                RANGE,
1404                HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1405            );
1406        }
1407        let mut response = request.send().await?;
1408
1409        if let Some(urls) = &layer.urls {
1410            for url in urls {
1411                if response.error_for_status_ref().is_ok() {
1412                    break;
1413                }
1414
1415                let url = Url::parse(url)
1416                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1417
1418                if url.scheme() == "http" || url.scheme() == "https" {
1419                    // NOTE: we must not authenticate on additional URLs as those
1420                    // can be abused to leak credentials or tokens.  Please
1421                    // refer to CVE-2020-15157 for more information.
1422                    request =
1423                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1424                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1425                            .into_request_builder();
1426                    if let Some(offset) = offset {
1427                        request = request.header(
1428                            RANGE,
1429                            HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1430                        );
1431                    }
1432                    response = request.send().await?
1433                }
1434            }
1435        }
1436
1437        Ok(response)
1438    }
1439
1440    /// Begins a session to push an image to registry in a monolithical way
1441    ///
1442    /// Returns URL with session UUID
1443    async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1444        let url = &self.to_v2_blob_upload_url(image);
1445        debug!(?url, "begin_push_monolithical_session");
1446        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1447            .apply_auth(image, RegistryOperation::Push)
1448            .await?
1449            .into_request_builder()
1450            // We set "Content-Length" to 0 here even though the OCI Distribution
1451            // spec does not strictly require that. In practice we have seen that
1452            // certain registries require "Content-Length" to be present for all
1453            // types of push sessions.
1454            .header("Content-Length", 0)
1455            .send()
1456            .await?;
1457
1458        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1459        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1460            .await
1461    }
1462
1463    /// Begins a session to push an image to registry as a series of chunks
1464    ///
1465    /// Returns URL with session UUID
1466    async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1467        let url = &self.to_v2_blob_upload_url(image);
1468        debug!(?url, "begin_push_session");
1469        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1470            .apply_auth(image, RegistryOperation::Push)
1471            .await?
1472            .into_request_builder()
1473            .header("Content-Length", 0)
1474            .send()
1475            .await?;
1476
1477        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1478        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1479            .await
1480    }
1481
1482    /// Closes the chunked push session
1483    ///
1484    /// Returns the pullable URL for the image
1485    async fn end_push_chunked_session(
1486        &self,
1487        location: &str,
1488        image: &Reference,
1489        digest: &str,
1490    ) -> Result<String> {
1491        let url = Url::parse_with_params(location, &[("digest", digest)])
1492            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1493        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1494            .apply_auth(image, RegistryOperation::Push)
1495            .await?
1496            .into_request_builder()
1497            .header("Content-Length", 0)
1498            .send()
1499            .await?;
1500        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1501            .await
1502    }
1503
1504    /// Pushes a layer to a registry as a monolithical blob.
1505    ///
1506    /// Returns the URL location for the next layer
1507    async fn push_monolithically(
1508        &self,
1509        location: &str,
1510        image: &Reference,
1511        layer: impl Into<bytes::Bytes>,
1512        blob_digest: &str,
1513    ) -> Result<String> {
1514        let mut url = Url::parse(location).unwrap();
1515        url.query_pairs_mut().append_pair("digest", blob_digest);
1516        let url = url.to_string();
1517
1518        let layer = layer.into();
1519        debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1520        if layer.is_empty() {
1521            return Err(OciDistributionError::PushNoDataError);
1522        };
1523        let mut headers = HeaderMap::new();
1524        headers.insert(
1525            "Content-Length",
1526            format!("{}", layer.len()).parse().unwrap(),
1527        );
1528        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1529
1530        let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1531            .apply_auth(image, RegistryOperation::Push)
1532            .await?
1533            .into_request_builder()
1534            .headers(headers)
1535            .body(layer)
1536            .send()
1537            .await?;
1538
1539        // Returns location
1540        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1541            .await
1542    }
1543
1544    /// Pushes a single chunk of a blob to a registry, as part of a chunked blob upload.
1545    /// The caller is responsible for chunking the blob data into smaller parts, if needed.
1546    ///
1547    /// Returns the URL location for the next chunk, alongside the start of the next range to upload.
1548    async fn push_chunk(
1549        &self,
1550        location: &str,
1551        image: &Reference,
1552        blob_chunk: bytes::Bytes,
1553        range_start: usize,
1554    ) -> Result<(String, usize)> {
1555        if blob_chunk.is_empty() {
1556            return Err(OciDistributionError::PushNoDataError);
1557        };
1558
1559        let chunk_size = blob_chunk.len();
1560        let end_range_inclusive = range_start + chunk_size - 1;
1561
1562        let mut headers = HeaderMap::new();
1563        headers.insert(
1564            "Content-Range",
1565            format!("{range_start}-{end_range_inclusive}")
1566                .parse()
1567                .unwrap(),
1568        );
1569
1570        headers.insert("Content-Length", format!("{chunk_size}").parse().unwrap());
1571        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1572
1573        debug!(
1574            ?range_start,
1575            ?end_range_inclusive,
1576            chunk_size,
1577            ?location,
1578            ?headers,
1579            "Pushing chunk"
1580        );
1581
1582        let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1583            .apply_auth(image, RegistryOperation::Push)
1584            .await?
1585            .into_request_builder()
1586            .headers(headers)
1587            .body(blob_chunk)
1588            .send()
1589            .await?;
1590
1591        // Returns location for next chunk and the start byte for the next range
1592        Ok((
1593            self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1594                .await?,
1595            end_range_inclusive + 1,
1596        ))
1597    }
1598
1599    /// Mounts a blob to the provided reference, from the given source
1600    pub async fn mount_blob(
1601        &self,
1602        image: &Reference,
1603        source: &Reference,
1604        digest: &str,
1605    ) -> Result<()> {
1606        let base_url = self.to_v2_blob_upload_url(image);
1607        let url = Url::parse_with_params(
1608            &base_url,
1609            &[("mount", digest), ("from", source.repository())],
1610        )
1611        .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1612
1613        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1614            .apply_auth(image, RegistryOperation::Push)
1615            .await?
1616            .into_request_builder()
1617            .send()
1618            .await?;
1619
1620        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1621            .await?;
1622
1623        Ok(())
1624    }
1625
1626    /// Pushes the manifest for a specified image
1627    ///
1628    /// Returns pullable manifest URL
1629    pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1630        let mut headers = HeaderMap::new();
1631        let content_type = manifest.content_type();
1632        headers.insert("Content-Type", content_type.parse().unwrap());
1633
1634        // Serialize the manifest with a canonical json formatter, as described at
1635        // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json
1636        let mut body = Vec::new();
1637        let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1638        manifest.serialize(&mut ser).unwrap();
1639
1640        self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1641            .await
1642    }
1643
1644    /// Pushes the manifest, provided as raw bytes, for a specified image
1645    ///
1646    /// Returns pullable manifest url
1647    pub async fn push_manifest_raw(
1648        &self,
1649        image: &Reference,
1650        body: impl Into<bytes::Bytes>,
1651        content_type: HeaderValue,
1652    ) -> Result<String> {
1653        let url = self.to_v2_manifest_url(image);
1654        debug!(?url, ?content_type, "push manifest");
1655
1656        let mut headers = HeaderMap::new();
1657        headers.insert("Content-Type", content_type);
1658
1659        let body = body.into();
1660
1661        // Calculate the digest of the manifest, this is useful
1662        // if the remote registry is violating the OCI Distribution Specification.
1663        // See below for more details.
1664        let manifest_hash = sha256_digest(&body);
1665
1666        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1667            .apply_auth(image, RegistryOperation::Push)
1668            .await?
1669            .into_request_builder()
1670            .headers(headers)
1671            .body(body)
1672            .send()
1673            .await?;
1674
1675        let ret = self
1676            .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1677            .await;
1678
1679        if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1680            // The registry is violating the OCI Distribution Spec, BUT the OCI
1681            // image/artifact has been uploaded successfully.
1682            // The `Location` header contains the sha256 digest of the manifest,
1683            // we can reuse the value we calculated before.
1684            // The workaround is there because repositories such as
1685            // AWS ECR are violating this aspect of the spec. This at least let the
1686            // oci-distribution users interact with these registries.
1687            warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1688
1689            let url_base = url
1690                .strip_suffix(image.tag().unwrap_or("latest"))
1691                .expect("The manifest URL always ends with the image tag suffix");
1692            let url_by_digest = format!("{url_base}{manifest_hash}");
1693
1694            return Ok(url_by_digest);
1695        }
1696
1697        ret
1698    }
1699
1700    /// Pulls the referrers for the given image filtering by the optionally provided artifact type.
1701    ///
1702    /// Implements the [OCI Distribution Spec referrers API][oci-referrers] with an automatic
1703    /// fallback to the [referrers tag schema][oci-tag-schema] when the registry returns a
1704    /// `404 Not Found` for the native endpoint (as required by the spec).
1705    ///
1706    /// Many registries (e.g. ghcr.io) do not implement the native
1707    /// `/v2/<name>/referrers/<digest>` endpoint and return 404 instead. The OCI spec
1708    /// defines a fallback: the referrers index is stored as a regular OCI Image Index
1709    /// under a tag derived from the subject digest by replacing `:` with `-`
1710    /// (e.g. `sha256:abc…` → tag `sha256-abc…`).
1711    ///
1712    /// When the fallback is used, `artifact_type` filtering is applied client-side,
1713    /// since the tag schema stores a single unfiltered index with no query-parameter
1714    /// support.
1715    ///
1716    /// If both the native API and the tag schema fail, an empty `OciImageIndex` is
1717    /// returned, as per the spec recommendation.
1718    ///
1719    /// [oci-referrers]: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#listing-referrers
1720    /// [oci-tag-schema]: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#referrers-tag-schema
1721    pub async fn pull_referrers(
1722        &self,
1723        image: &Reference,
1724        artifact_type: Option<&str>,
1725    ) -> Result<OciImageIndex> {
1726        let url = self.to_v2_referrers_url(image, artifact_type)?;
1727        debug!("Pulling referrers from {}", url);
1728
1729        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1730            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1731            .apply_auth(image, RegistryOperation::Pull)
1732            .await?
1733            .into_request_builder()
1734            .send()
1735            .await?;
1736        let status = res.status();
1737        let body = res.bytes().await?;
1738
1739        // Per the OCI Distribution Spec, a 404 on the native referrers endpoint means the
1740        // registry does not support it; fall back to the referrers tag schema.
1741        if status == reqwest::StatusCode::NOT_FOUND {
1742            debug!(
1743                url = %url,
1744                "Native referrers API returned 404; falling back to OCI referrers tag schema"
1745            );
1746            return self
1747                .pull_referrers_via_tag_schema(image, artifact_type)
1748                .await;
1749        }
1750
1751        validate_registry_response(status, &body, &url)?;
1752        let manifest = serde_json::from_slice(&body)
1753            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1754
1755        Ok(manifest)
1756    }
1757
1758    /// Pulls the referrers index using the OCI referrers tag schema fallback.
1759    ///
1760    /// The tag is the subject digest with `:` replaced by `-`
1761    /// (e.g. `sha256:abc…` → `sha256-abc…`).
1762    ///
1763    /// If `artifact_type` is provided, the returned index is filtered client-side
1764    /// to include only entries whose `artifact_type` matches.
1765    ///
1766    /// If the tag does not exist or does not contain a valid image index, an empty
1767    /// `OciImageIndex` is returned as per the OCI spec recommendation.
1768    async fn pull_referrers_via_tag_schema(
1769        &self,
1770        image: &Reference,
1771        artifact_type: Option<&str>,
1772    ) -> Result<OciImageIndex> {
1773        let digest = image.digest().ok_or_else(|| {
1774            OciDistributionError::GenericError(Some(
1775                "Getting referrers for a tag is not supported".into(),
1776            ))
1777        })?;
1778
1779        let fallback_tag = digest.replace(':', "-");
1780        let fallback_ref = Reference::with_tag(
1781            image.resolve_registry().to_string(),
1782            image.repository().to_string(),
1783            fallback_tag.clone(),
1784        );
1785
1786        debug!(
1787            tag = %fallback_tag,
1788            "Pulling referrers via tag schema"
1789        );
1790
1791        let manifest = match self._pull_manifest(&fallback_ref).await {
1792            Ok((manifest, _digest)) => manifest,
1793            Err(e) => match &e {
1794                OciDistributionError::ImageManifestNotFoundError(_)
1795                | OciDistributionError::RegistryError { .. }
1796                | OciDistributionError::ServerError { code: 404, .. } => {
1797                    debug!(
1798                        error = ?e,
1799                        "Referrers tag schema not found; assuming no referrers"
1800                    );
1801                    return Ok(empty_image_index());
1802                }
1803                _ => return Err(e),
1804            },
1805        };
1806
1807        let mut index = match manifest {
1808            OciManifest::ImageIndex(idx) => idx,
1809            OciManifest::Image(_) => {
1810                return Err(OciDistributionError::SpecViolationError(format!(
1811                    "referrers tag schema: tag '{fallback_tag}' contains an Image manifest; \
1812                     expected an OCI Image Index"
1813                )));
1814            }
1815        };
1816
1817        // Apply client-side artifact_type filtering when requested, since the tag
1818        // schema stores a single unfiltered index.
1819        if let Some(at) = artifact_type {
1820            index.manifests.retain(|entry| {
1821                entry
1822                    .artifact_type
1823                    .as_deref()
1824                    .map(|t| t == at)
1825                    .unwrap_or(false)
1826            });
1827        }
1828
1829        Ok(index)
1830    }
1831
1832    /// Lists available repositories in the registry.
1833    ///
1834    /// Implements the OCI Distribution Spec catalog endpoint (`/v2/_catalog`).
1835    /// Supports pagination via `n` (page size) and `last` (last repo from
1836    /// previous page).
1837    pub async fn catalog(
1838        &self,
1839        image: &Reference,
1840        auth: &RegistryAuth,
1841        n: Option<usize>,
1842        last: Option<&str>,
1843    ) -> Result<CatalogResponse> {
1844        let op = RegistryOperation::Pull;
1845        let url = self.to_catalog_url(image);
1846
1847        self.store_auth_if_needed(image.resolve_registry(), auth)
1848            .await;
1849
1850        let request = self.client.get(&url);
1851        let request = if let Some(num) = n {
1852            request.query(&[("n", num)])
1853        } else {
1854            request
1855        };
1856        let request = if let Some(l) = last {
1857            request.query(&[("last", l)])
1858        } else {
1859            request
1860        };
1861        let request = RequestBuilderWrapper {
1862            client: self,
1863            request_builder: request,
1864        };
1865        let res = request
1866            .apply_auth(image, op)
1867            .await?
1868            .into_request_builder()
1869            .send()
1870            .await?;
1871        let status = res.status();
1872        let body = res.bytes().await?;
1873
1874        validate_registry_response(status, &body, &url)?;
1875
1876        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
1877    }
1878
1879    async fn extract_location_header(
1880        &self,
1881        image: &Reference,
1882        res: reqwest::Response,
1883        expected_status: &reqwest::StatusCode,
1884    ) -> Result<String> {
1885        debug!(expected_status_code=?expected_status.as_u16(),
1886            status_code=?res.status().as_u16(),
1887            "extract location header");
1888        if res.status().eq(expected_status) {
1889            let location_header = res.headers().get("Location");
1890            debug!(location=?location_header, "Location header");
1891            match location_header {
1892                None => Err(OciDistributionError::RegistryNoLocationError),
1893                Some(lh) => self.location_header_to_url(image, lh),
1894            }
1895        } else if res.status().is_success() && expected_status.is_success() {
1896            Err(OciDistributionError::SpecViolationError(format!(
1897                "Expected HTTP Status {}, got {} instead",
1898                expected_status,
1899                res.status(),
1900            )))
1901        } else {
1902            let url = res.url().to_string();
1903            let code = res.status().as_u16();
1904            let message = res.text().await?;
1905            Err(OciDistributionError::ServerError { url, code, message })
1906        }
1907    }
1908
1909    /// Helper function to convert location header to URL
1910    ///
1911    /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path)
1912    /// Returns a properly formatted absolute URL
1913    fn location_header_to_url(
1914        &self,
1915        image: &Reference,
1916        location_header: &reqwest::header::HeaderValue,
1917    ) -> Result<String> {
1918        let lh = location_header.to_str()?;
1919        if lh.starts_with("/") {
1920            let registry = image.resolve_registry();
1921            Ok(format!(
1922                "{scheme}://{registry}{lh}",
1923                scheme = self.config.protocol.scheme_for(registry)
1924            ))
1925        } else {
1926            Ok(lh.to_string())
1927        }
1928    }
1929
1930    /// Convert a Reference to a v2 manifest URL.
1931    fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1932        let registry = reference.resolve_registry();
1933        format!(
1934            "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1935            scheme = self.config.protocol.scheme_for(registry),
1936            repository = reference.repository(),
1937            reference = if let Some(digest) = reference.digest() {
1938                digest
1939            } else {
1940                reference.tag().unwrap_or("latest")
1941            },
1942            ns = reference
1943                .namespace()
1944                .map(|ns| format!("?ns={ns}"))
1945                .unwrap_or_default(),
1946        )
1947    }
1948
1949    /// Convert a Reference to a v2 blob (layer) URL.
1950    fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1951        let registry = reference.resolve_registry();
1952        format!(
1953            "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1954            scheme = self.config.protocol.scheme_for(registry),
1955            repository = reference.repository(),
1956            ns = reference
1957                .namespace()
1958                .map(|ns| format!("?ns={ns}"))
1959                .unwrap_or_default(),
1960        )
1961    }
1962
1963    /// Convert a Reference to a v2 blob upload URL.
1964    fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1965        self.to_v2_blob_url(reference, "uploads/")
1966    }
1967
1968    fn to_list_tags_url(&self, reference: &Reference) -> String {
1969        let registry = reference.resolve_registry();
1970        format!(
1971            "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1972            scheme = self.config.protocol.scheme_for(registry),
1973            repository = reference.repository(),
1974            ns = reference
1975                .namespace()
1976                .map(|ns| format!("?ns={ns}"))
1977                .unwrap_or_default(),
1978        )
1979    }
1980
1981    fn to_catalog_url(&self, reference: &Reference) -> String {
1982        let registry = reference.resolve_registry();
1983        format!(
1984            "{scheme}://{registry}/v2/_catalog",
1985            scheme = self.config.protocol.scheme_for(registry),
1986        )
1987    }
1988
1989    /// Convert a Reference to a v2 manifest URL.
1990    fn to_v2_referrers_url(
1991        &self,
1992        reference: &Reference,
1993        artifact_type: Option<&str>,
1994    ) -> Result<String> {
1995        let registry = reference.resolve_registry();
1996        Ok(format!(
1997            "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1998            scheme = self.config.protocol.scheme_for(registry),
1999            repository = reference.repository(),
2000            reference = if let Some(digest) = reference.digest() {
2001                digest
2002            } else {
2003                return Err(OciDistributionError::GenericError(Some(
2004                    "Getting referrers for a tag is not supported".into(),
2005                )));
2006            },
2007            at = artifact_type
2008                .map(|at| format!("?artifactType={at}"))
2009                .unwrap_or_default(),
2010        ))
2011    }
2012}
2013
2014/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404.
2015/// Obviously, HTTP servers are going to send other codes. This tries to catch the
2016/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error.
2017fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
2018    match status {
2019        reqwest::StatusCode::OK => Ok(()),
2020        reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
2021            url: url.to_string(),
2022        }),
2023        s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
2024            "Expected HTTP Status {}, got {} instead",
2025            reqwest::StatusCode::OK,
2026            status,
2027        ))),
2028        s if s.is_client_error() => {
2029            match serde_json::from_slice::<OciEnvelope>(body) {
2030                // According to the OCI spec, we should see an error in the message body.
2031                Ok(envelope) => Err(OciDistributionError::RegistryError {
2032                    envelope,
2033                    url: url.to_string(),
2034                }),
2035                // Fall back to a plain server error if the body isn't a valid `OciEnvelope`
2036                Err(_) => Err(OciDistributionError::ServerError {
2037                    code: s.as_u16(),
2038                    url: url.to_string(),
2039                    message: String::from_utf8_lossy(body).to_string(),
2040                }),
2041            }
2042        }
2043        s => {
2044            let text = std::str::from_utf8(body)?;
2045
2046            Err(OciDistributionError::ServerError {
2047                code: s.as_u16(),
2048                url: url.to_string(),
2049                message: text.to_string(),
2050            })
2051        }
2052    }
2053}
2054
2055/// Returns an empty OCI Image Index, as used when no referrers exist.
2056fn empty_image_index() -> OciImageIndex {
2057    OciImageIndex {
2058        schema_version: 2,
2059        media_type: Some(crate::manifest::OCI_IMAGE_INDEX_MEDIA_TYPE.to_string()),
2060        artifact_type: None,
2061        annotations: None,
2062        manifests: vec![],
2063    }
2064}
2065
2066/// Converts a response into a stream
2067fn stream_from_response(
2068    response: Response,
2069    layer: impl AsLayerDescriptor,
2070    verify: bool,
2071) -> Result<SizedStream> {
2072    let content_length = response.content_length();
2073    let headers = response.headers().clone();
2074    let stream = response
2075        .error_for_status()?
2076        .bytes_stream()
2077        .map_err(std::io::Error::other);
2078
2079    let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
2080    let layer_digester = Digester::new(&expected_layer_digest)?;
2081    let header_digester_and_digest = match digest_header_value(headers)? {
2082        // If the digests match, we don't need to do both digesters
2083        Some(digest) if digest == expected_layer_digest => None,
2084        Some(digest) => Some((Digester::new(&digest)?, digest)),
2085        None => None,
2086    };
2087    let header_digest = header_digester_and_digest
2088        .as_ref()
2089        .map(|(_, digest)| digest.to_owned());
2090    let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
2091        Box::pin(VerifyingStream::new(
2092            Box::pin(stream),
2093            layer_digester,
2094            expected_layer_digest,
2095            header_digester_and_digest,
2096        ))
2097    } else {
2098        Box::pin(stream)
2099    };
2100    Ok(SizedStream {
2101        content_length,
2102        digest_header_value: header_digest,
2103        stream,
2104    })
2105}
2106
2107/// The request builder wrapper allows to be instantiated from a
2108/// `Client` and allows composable operations on the request builder,
2109/// to produce a `RequestBuilder` object that can be executed.
2110struct RequestBuilderWrapper<'a> {
2111    client: &'a Client,
2112    request_builder: RequestBuilder,
2113}
2114
2115// RequestBuilderWrapper type management
2116impl<'a> RequestBuilderWrapper<'a> {
2117    /// Create a `RequestBuilderWrapper` from a `Client` instance, by
2118    /// instantiating the internal `RequestBuilder` with the provided
2119    /// function `f`.
2120    fn from_client(
2121        client: &'a Client,
2122        f: impl Fn(&reqwest::Client) -> RequestBuilder,
2123    ) -> RequestBuilderWrapper<'a> {
2124        let request_builder = f(&client.client);
2125        RequestBuilderWrapper {
2126            client,
2127            request_builder,
2128        }
2129    }
2130
2131    // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper`
2132    fn into_request_builder(self) -> RequestBuilder {
2133        self.request_builder
2134    }
2135}
2136
2137// Composable functions applicable to a `RequestBuilderWrapper`
2138impl<'a> RequestBuilderWrapper<'a> {
2139    fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper<'_>> {
2140        let request_builder = self
2141            .request_builder
2142            .try_clone()
2143            .ok_or_else(|| {
2144                OciDistributionError::GenericError(Some(
2145                    "could not clone request builder".to_string(),
2146                ))
2147            })?
2148            .header("Accept", Vec::from(accept).join(", "));
2149
2150        Ok(RequestBuilderWrapper {
2151            client: self.client,
2152            request_builder,
2153        })
2154    }
2155
2156    /// Updates request as necessary for authentication.
2157    ///
2158    /// If the struct has Some(bearer), this will insert the bearer token in an
2159    /// Authorization header. It will also set the Accept header, which must
2160    /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth
2161    /// credentials, these will be configured.
2162    async fn apply_auth(
2163        &self,
2164        image: &Reference,
2165        op: RegistryOperation,
2166    ) -> Result<RequestBuilderWrapper<'_>> {
2167        let mut headers = HeaderMap::new();
2168
2169        if let Some(token) = self.client.get_auth_token(image, op).await {
2170            match token {
2171                RegistryTokenType::Bearer(token) => {
2172                    debug!("Using bearer token authentication.");
2173                    headers.insert("Authorization", token.bearer_token().parse().unwrap());
2174                }
2175                RegistryTokenType::Basic(username, password) => {
2176                    debug!("Using HTTP basic authentication.");
2177                    return Ok(RequestBuilderWrapper {
2178                        client: self.client,
2179                        request_builder: self
2180                            .request_builder
2181                            .try_clone()
2182                            .ok_or_else(|| {
2183                                OciDistributionError::GenericError(Some(
2184                                    "could not clone request builder".to_string(),
2185                                ))
2186                            })?
2187                            .headers(headers)
2188                            .basic_auth(username.to_string(), Some(password.to_string())),
2189                    });
2190                }
2191            }
2192        }
2193        Ok(RequestBuilderWrapper {
2194            client: self.client,
2195            request_builder: self
2196                .request_builder
2197                .try_clone()
2198                .ok_or_else(|| {
2199                    OciDistributionError::GenericError(Some(
2200                        "could not clone request builder".to_string(),
2201                    ))
2202                })?
2203                .headers(headers),
2204        })
2205    }
2206}
2207
2208/// The encoding of the certificate
2209#[derive(Debug, Clone)]
2210pub enum CertificateEncoding {
2211    #[allow(missing_docs)]
2212    Der,
2213    #[allow(missing_docs)]
2214    Pem,
2215}
2216
2217/// A x509 certificate
2218#[derive(Debug, Clone)]
2219pub struct Certificate {
2220    /// Which encoding is used by the certificate
2221    pub encoding: CertificateEncoding,
2222
2223    /// Actual certificate
2224    pub data: Vec<u8>,
2225}
2226
2227impl TryFrom<&Certificate> for reqwest::Certificate {
2228    type Error = OciDistributionError;
2229
2230    fn try_from(cert: &Certificate) -> Result<Self> {
2231        match cert.encoding {
2232            CertificateEncoding::Der => Ok(reqwest::Certificate::from_der(cert.data.as_slice())?),
2233            CertificateEncoding::Pem => Ok(reqwest::Certificate::from_pem(cert.data.as_slice())?),
2234        }
2235    }
2236}
2237
2238fn convert_certificates(certs: &[Certificate]) -> Result<Vec<reqwest::Certificate>> {
2239    certs.iter().map(reqwest::Certificate::try_from).collect()
2240}
2241
2242/// A client configuration
2243pub struct ClientConfig {
2244    /// Which protocol the client should use
2245    pub protocol: ClientProtocol,
2246
2247    /// Accept invalid hostname. Defaults to false
2248    #[cfg(feature = "native-tls")]
2249    pub accept_invalid_hostnames: bool,
2250
2251    /// Accept invalid certificates. Defaults to false
2252    pub accept_invalid_certificates: bool,
2253
2254    /// Use monolithic push for pushing blobs. Defaults to false
2255    pub use_monolithic_push: bool,
2256
2257    /// Use only the provided certificate roots.
2258    ///
2259    /// This option disables any native or built-in roots, and **only** uses
2260    /// the roots provided to this method.
2261    pub tls_certs_only: Vec<Certificate>,
2262
2263    /// A list of extra root certificate to trust. This can be used to connect
2264    /// to servers using self-signed certificates
2265    pub extra_root_certificates: Vec<Certificate>,
2266
2267    /// A function that defines the client's behaviour if an Image Index Manifest
2268    /// (i.e Manifest List) is encountered when pulling an image.
2269    /// Defaults to [current_platform_resolver],
2270    /// which attempts to choose an image matching the running OS and Arch.
2271    ///
2272    /// If set to None, an error is raised if an Image Index manifest is received
2273    /// during an image pull.
2274    pub platform_resolver: Option<Box<PlatformResolverFn>>,
2275
2276    /// Maximum number of concurrent uploads to perform during a `push`
2277    /// operation.
2278    ///
2279    /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
2280    pub max_concurrent_upload: usize,
2281
2282    /// Maximum number of concurrent downloads to perform during a `pull`
2283    /// operation.
2284    ///
2285    /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
2286    pub max_concurrent_download: usize,
2287
2288    /// Default token expiration in seconds, to use when the token claim
2289    /// doesn't provide a value.
2290    ///
2291    /// This defaults to [`DEFAULT_TOKEN_EXPIRATION_SECS`].
2292    pub default_token_expiration_secs: usize,
2293
2294    /// Enables a read timeout for the client.
2295    ///
2296    /// See [`reqwest::ClientBuilder::read_timeout`] for more information.
2297    pub read_timeout: Option<Duration>,
2298
2299    /// Set a timeout for the connect phase for the client.
2300    ///
2301    /// See [`reqwest::ClientBuilder::connect_timeout`] for more information.
2302    pub connect_timeout: Option<Duration>,
2303
2304    /// Set the `User-Agent` used by the client.
2305    ///
2306    /// This defaults to `oci-client/<version>` where `<version>` is the crate version.
2307    pub user_agent: &'static str,
2308
2309    /// Set the `HTTPS PROXY` used by the client.
2310    ///
2311    /// This defaults to `None`.
2312    pub https_proxy: Option<String>,
2313
2314    /// Set the `HTTP PROXY` used by the client.
2315    ///
2316    /// This defaults to `None`.
2317    pub http_proxy: Option<String>,
2318
2319    /// Set the `NO PROXY` used by the client.
2320    ///
2321    /// This defaults to `None`.
2322    pub no_proxy: Option<String>,
2323}
2324
2325impl Default for ClientConfig {
2326    fn default() -> Self {
2327        Self {
2328            protocol: ClientProtocol::default(),
2329            #[cfg(feature = "native-tls")]
2330            accept_invalid_hostnames: false,
2331            accept_invalid_certificates: false,
2332            use_monolithic_push: false,
2333            tls_certs_only: Vec::new(),
2334            extra_root_certificates: Vec::new(),
2335            platform_resolver: Some(Box::new(current_platform_resolver)),
2336            max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
2337            max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
2338            default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
2339            read_timeout: None,
2340            connect_timeout: None,
2341            user_agent: DEFAULT_USER_AGENT,
2342            https_proxy: None,
2343            http_proxy: None,
2344            no_proxy: None,
2345        }
2346    }
2347}
2348
2349// Be explicit about the traits supported by this type. This is needed to use
2350// the Client behind a dynamic reference.
2351// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549
2352type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
2353
2354/// A platform resolver that chooses the first linux/amd64 variant, if present
2355pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2356    manifests
2357        .iter()
2358        .find(|entry| {
2359            entry.platform.as_ref().is_some_and(|platform| {
2360                platform.os == Os::Linux && platform.architecture == Arch::Amd64
2361            })
2362        })
2363        .map(|entry| entry.digest.clone())
2364}
2365
2366/// A platform resolver that chooses the first windows/amd64 variant, if present
2367pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2368    manifests
2369        .iter()
2370        .find(|entry| {
2371            entry.platform.as_ref().is_some_and(|platform| {
2372                platform.os == Os::Windows && platform.architecture == Arch::Amd64
2373            })
2374        })
2375        .map(|entry| entry.digest.clone())
2376}
2377
2378/// A platform resolver that chooses the first variant matching the running OS/Arch, if present.
2379/// Doesn't currently handle platform.variants.
2380pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2381    manifests
2382        .iter()
2383        .find(|entry| {
2384            entry.platform.as_ref().is_some_and(|platform| {
2385                platform.os == Os::default() && platform.architecture == Arch::default()
2386            })
2387        })
2388        .map(|entry| entry.digest.clone())
2389}
2390
2391/// The protocol that the client should use to connect
2392#[derive(Debug, Clone, PartialEq, Eq, Default)]
2393pub enum ClientProtocol {
2394    #[allow(missing_docs)]
2395    Http,
2396    #[allow(missing_docs)]
2397    #[default]
2398    Https,
2399    #[allow(missing_docs)]
2400    HttpsExcept(Vec<String>),
2401}
2402
2403impl ClientProtocol {
2404    fn scheme_for(&self, registry: &str) -> &str {
2405        match self {
2406            ClientProtocol::Https => "https",
2407            ClientProtocol::Http => "http",
2408            ClientProtocol::HttpsExcept(exceptions) => {
2409                if exceptions.contains(&registry.to_owned()) {
2410                    "http"
2411                } else {
2412                    "https"
2413                }
2414            }
2415        }
2416    }
2417}
2418
2419#[derive(Clone, Debug)]
2420struct BearerChallenge {
2421    pub realm: Box<str>,
2422    pub service: Option<String>,
2423}
2424
2425impl TryFrom<&HeaderValue> for BearerChallenge {
2426    type Error = String;
2427
2428    fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2429        let parser = ChallengeParser::new(
2430            value
2431                .to_str()
2432                .map_err(|e| format!("cannot convert header value to string: {e:?}"))?,
2433        );
2434        parser
2435            .filter_map(|parser_res| {
2436                if let Ok(chalenge_ref) = parser_res {
2437                    let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2438                    bearer_challenge.ok()
2439                } else {
2440                    None
2441                }
2442            })
2443            .next()
2444            .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2445    }
2446}
2447
2448impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2449    type Error = String;
2450
2451    fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2452        if !value.scheme.eq_ignore_ascii_case("Bearer") {
2453            return Err(format!(
2454                "BearerChallenge doesn't support challenge scheme {:?}",
2455                value.scheme
2456            ));
2457        }
2458        let mut realm = None;
2459        let mut service = None;
2460        for (k, v) in &value.params {
2461            if k.eq_ignore_ascii_case("realm") {
2462                realm = Some(v.to_unescaped());
2463            }
2464
2465            if k.eq_ignore_ascii_case("service") {
2466                service = Some(v.to_unescaped());
2467            }
2468        }
2469
2470        let realm = realm.ok_or("missing required parameter realm")?;
2471
2472        Ok(BearerChallenge {
2473            realm: realm.into_boxed_str(),
2474            service,
2475        })
2476    }
2477}
2478
2479#[cfg(test)]
2480mod test {
2481    use super::*;
2482    use std::convert::TryFrom;
2483    use std::fs;
2484    use std::path;
2485    use std::result::Result;
2486
2487    use bytes::Bytes;
2488    use rstest::rstest;
2489    use sha2::Digest as _;
2490    use tempfile::TempDir;
2491    use tokio::io::AsyncReadExt;
2492    use tokio_util::io::StreamReader;
2493
2494    use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2495
2496    #[cfg(feature = "test-registry")]
2497    use testcontainers::{
2498        core::{Mount, WaitFor},
2499        runners::AsyncRunner,
2500        ContainerRequest, GenericImage, ImageExt,
2501    };
2502
2503    const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2504    const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2505    const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2506    const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2507    const TEST_IMAGES: &[&str] = &[
2508        // TODO(jlegrone): this image cannot be pulled currently because no `latest`
2509        //                 tag exists on the image repository. Re-enable this image
2510        //                 in tests once `latest` is published.
2511        // HELLO_IMAGE_NO_TAG,
2512        HELLO_IMAGE_TAG,
2513        HELLO_IMAGE_DIGEST,
2514        HELLO_IMAGE_TAG_AND_DIGEST,
2515    ];
2516    const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2517    const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2518    const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2519    const HTPASSWD_USERNAME: &str = "testuser";
2520    const HTPASSWD_PASSWORD: &str = "testpassword";
2521
2522    const EMPTY_JSON_BLOB: &str = "{}";
2523    const EMPTY_JSON_DIGEST: &str =
2524        "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
2525
2526    #[test]
2527    fn test_apply_accept() -> anyhow::Result<()> {
2528        assert_eq!(
2529            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2530                .get("https://example.com/some/module.wasm"))
2531            .apply_accept(&["*/*"])?
2532            .into_request_builder()
2533            .build()?
2534            .headers()["Accept"],
2535            "*/*"
2536        );
2537
2538        assert_eq!(
2539            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2540                .get("https://example.com/some/module.wasm"))
2541            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2542            .into_request_builder()
2543            .build()?
2544            .headers()["Accept"],
2545            MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2546        );
2547
2548        Ok(())
2549    }
2550
2551    #[tokio::test]
2552    async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2553        assert!(
2554            !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2555                .get("https://example.com/some/module.wasm"))
2556            .apply_auth(
2557                &Reference::try_from(HELLO_IMAGE_TAG)?,
2558                RegistryOperation::Pull
2559            )
2560            .await?
2561            .into_request_builder()
2562            .build()?
2563            .headers()
2564            .contains_key("Authorization")
2565        );
2566
2567        Ok(())
2568    }
2569
2570    #[derive(Serialize)]
2571    struct EmptyClaims {}
2572
2573    #[tokio::test]
2574    async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2575        crate::test_helpers::jsonwebtoken_install_default_crypto_provider();
2576        let _ = tracing_subscriber::fmt::try_init();
2577        let client = Client::default();
2578        let header = jsonwebtoken::Header::default();
2579        let claims = EmptyClaims {};
2580        let key = jsonwebtoken::EncodingKey::from_secret(b"some-secret");
2581        let token = jsonwebtoken::encode(&header, &claims, &key)?;
2582
2583        // we have to have it in the stored auth so we'll get to the token cache check.
2584        client
2585            .store_auth(
2586                Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2587                RegistryAuth::Anonymous,
2588            )
2589            .await;
2590
2591        client
2592            .tokens
2593            .insert(
2594                &Reference::try_from(HELLO_IMAGE_TAG)?,
2595                RegistryOperation::Pull,
2596                RegistryTokenType::Bearer(RegistryToken::Token {
2597                    token: token.clone(),
2598                }),
2599            )
2600            .await;
2601
2602        assert_eq!(
2603            RequestBuilderWrapper::from_client(&client, |client| client
2604                .get("https://example.com/some/module.wasm"))
2605            .apply_auth(
2606                &Reference::try_from(HELLO_IMAGE_TAG)?,
2607                RegistryOperation::Pull
2608            )
2609            .await?
2610            .into_request_builder()
2611            .build()?
2612            .headers()["Authorization"],
2613            format!("Bearer {}", &token)
2614        );
2615
2616        Ok(())
2617    }
2618
2619    #[test]
2620    fn test_to_v2_blob_url() {
2621        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2622        let c = Client::default();
2623
2624        assert_eq!(
2625            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2626            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2627        );
2628
2629        image.set_mirror_registry("docker.mirror.io".to_owned());
2630        assert_eq!(
2631            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2632            "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2633        );
2634    }
2635
2636    #[rstest(image, expected_uri, expected_mirror_uri,
2637        case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), // TODO: confirm this is the right translation when no tag
2638        case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2639        case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2640        case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2641    )]
2642    fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2643        let mut reference = Reference::try_from(image).expect("failed to parse reference");
2644        let c = Client::default();
2645        assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2646
2647        reference.set_mirror_registry("docker.mirror.io".to_owned());
2648        assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2649    }
2650
2651    #[test]
2652    fn test_to_v2_blob_upload_url() {
2653        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2654        let blob_url = Client::default().to_v2_blob_upload_url(&image);
2655
2656        assert_eq!(
2657            blob_url,
2658            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2659        )
2660    }
2661
2662    #[test]
2663    fn test_to_list_tags_url() {
2664        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2665        let c = Client::default();
2666
2667        assert_eq!(
2668            c.to_list_tags_url(&image),
2669            "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2670        );
2671
2672        image.set_mirror_registry("docker.mirror.io".to_owned());
2673        assert_eq!(
2674            c.to_list_tags_url(&image),
2675            "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2676        );
2677    }
2678
2679    #[test]
2680    fn test_to_catalog_url() {
2681        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2682        let c = Client::default();
2683
2684        assert_eq!(
2685            c.to_catalog_url(&image),
2686            "https://webassembly.azurecr.io/v2/_catalog"
2687        );
2688
2689        image.set_mirror_registry("docker.mirror.io".to_owned());
2690        assert_eq!(
2691            c.to_catalog_url(&image),
2692            "https://docker.mirror.io/v2/_catalog"
2693        );
2694    }
2695
2696    #[test]
2697    fn manifest_url_generation_respects_http_protocol() {
2698        let c = Client::new(ClientConfig {
2699            protocol: ClientProtocol::Http,
2700            ..Default::default()
2701        });
2702        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2703            .expect("Could not parse reference");
2704        assert_eq!(
2705            "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2706            c.to_v2_manifest_url(&reference)
2707        );
2708    }
2709
2710    #[test]
2711    fn blob_url_generation_respects_http_protocol() {
2712        let c = Client::new(ClientConfig {
2713            protocol: ClientProtocol::Http,
2714            ..Default::default()
2715        });
2716        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2717            .expect("Could not parse reference");
2718        assert_eq!(
2719            "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2720            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2721        );
2722    }
2723
2724    #[test]
2725    fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2726        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2727        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2728        let c = Client::new(ClientConfig {
2729            protocol,
2730            ..Default::default()
2731        });
2732        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2733            .expect("Could not parse reference");
2734        assert_eq!(
2735            "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2736            c.to_v2_manifest_url(&reference)
2737        );
2738    }
2739
2740    #[test]
2741    fn manifest_url_generation_uses_http_if_on_exception_list() {
2742        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2743        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2744        let c = Client::new(ClientConfig {
2745            protocol,
2746            ..Default::default()
2747        });
2748        let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2749            .expect("Could not parse reference");
2750        assert_eq!(
2751            "http://oci.registry.local/v2/hello/manifests/v1",
2752            c.to_v2_manifest_url(&reference)
2753        );
2754    }
2755
2756    #[test]
2757    fn blob_url_generation_uses_https_if_not_on_exception_list() {
2758        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2759        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2760        let c = Client::new(ClientConfig {
2761            protocol,
2762            ..Default::default()
2763        });
2764        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2765            .expect("Could not parse reference");
2766        assert_eq!(
2767            "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2768            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2769        );
2770    }
2771
2772    #[test]
2773    fn blob_url_generation_uses_http_if_on_exception_list() {
2774        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2775        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2776        let c = Client::new(ClientConfig {
2777            protocol,
2778            ..Default::default()
2779        });
2780        let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2781            .expect("Could not parse reference");
2782        assert_eq!(
2783            "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2784            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2785        );
2786    }
2787
2788    #[test]
2789    fn can_generate_valid_digest() {
2790        let bytes = b"hellobytes";
2791        let hash = sha256_digest(bytes);
2792
2793        let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2794        let combination_hash =
2795            sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2796
2797        assert_eq!(
2798            hash,
2799            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2800        );
2801        assert_eq!(
2802            combination_hash,
2803            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2804        );
2805    }
2806
2807    #[test]
2808    fn test_registry_token_deserialize() {
2809        // 'token' field, standalone
2810        let text = r#"{"token": "abc"}"#;
2811        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2812        assert!(res.is_ok());
2813        let rt = res.unwrap();
2814        assert_eq!(rt.token(), "abc");
2815
2816        // 'access_token' field, standalone
2817        let text = r#"{"access_token": "xyz"}"#;
2818        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2819        assert!(res.is_ok());
2820        let rt = res.unwrap();
2821        assert_eq!(rt.token(), "xyz");
2822
2823        // both 'token' and 'access_token' fields, 'token' field takes precedence
2824        let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2825        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2826        assert!(res.is_ok());
2827        let rt = res.unwrap();
2828        assert_eq!(rt.token(), "abc");
2829
2830        // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order)
2831        let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2832        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2833        assert!(res.is_ok());
2834        let rt = res.unwrap();
2835        assert_eq!(rt.token(), "abc");
2836
2837        // non-string fields do not break parsing
2838        let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2839        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2840        assert!(res.is_ok());
2841
2842        // Note: tokens should always be strings. The next two tests ensure that if one field
2843        // is invalid (integer), then parse can still succeed if the other field is a string.
2844        //
2845        // numeric 'access_token' field, but string 'token' field does not in parse error
2846        let text = r#"{"access_token": 300, "token": "abc"}"#;
2847        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2848        assert!(res.is_ok());
2849        let rt = res.unwrap();
2850        assert_eq!(rt.token(), "abc");
2851
2852        // numeric 'token' field, but string 'accesss_token' field does not in parse error
2853        let text = r#"{"access_token": "xyz", "token": 300}"#;
2854        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2855        assert!(res.is_ok());
2856        let rt = res.unwrap();
2857        assert_eq!(rt.token(), "xyz");
2858
2859        // numeric 'token' field results in parse error
2860        let text = r#"{"token": 300}"#;
2861        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2862        assert!(res.is_err());
2863
2864        // numeric 'access_token' field results in parse error
2865        let text = r#"{"access_token": 300}"#;
2866        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2867        assert!(res.is_err());
2868
2869        // object 'token' field results in parse error
2870        let text = r#"{"token": {"some": "thing"}}"#;
2871        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2872        assert!(res.is_err());
2873
2874        // object 'access_token' field results in parse error
2875        let text = r#"{"access_token": {"some": "thing"}}"#;
2876        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2877        assert!(res.is_err());
2878
2879        // missing fields results in parse error
2880        let text = r#"{"some": "thing"}"#;
2881        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2882        assert!(res.is_err());
2883
2884        // bad JSON results in parse error
2885        let text = r#"{"token": "abc""#;
2886        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2887        assert!(res.is_err());
2888
2889        // worse JSON results in parse error
2890        let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2891        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2892        assert!(res.is_err());
2893    }
2894
2895    fn check_auth_token(token: &str) {
2896        // We test that the token is longer than a minimal hash.
2897        assert!(token.len() > 64);
2898    }
2899
2900    #[tokio::test]
2901    async fn test_auth() {
2902        let _ = tracing_subscriber::fmt::try_init();
2903        for &image in TEST_IMAGES {
2904            let reference = Reference::try_from(image).expect("failed to parse reference");
2905            let c = Client::default();
2906            let token = c
2907                .auth(
2908                    &reference,
2909                    &RegistryAuth::Anonymous,
2910                    RegistryOperation::Pull,
2911                )
2912                .await
2913                .expect("result from auth request");
2914
2915            assert!(token.is_some());
2916            check_auth_token(token.unwrap().as_ref());
2917
2918            let tok = c
2919                .tokens
2920                .get(&reference, RegistryOperation::Pull)
2921                .await
2922                .expect("token is available");
2923            // We test that the token is longer than a minimal hash.
2924            if let RegistryTokenType::Bearer(tok) = tok {
2925                check_auth_token(tok.token());
2926            } else {
2927                panic!("Unexpeted Basic Auth Token");
2928            }
2929        }
2930    }
2931
2932    #[cfg(feature = "test-registry")]
2933    #[tokio::test]
2934    async fn test_list_tags() {
2935        let test_container = registry_image_edge()
2936            .start()
2937            .await
2938            .expect("Failed to start registry container");
2939        let port = test_container
2940            .get_host_port_ipv4(5000)
2941            .await
2942            .expect("Failed to get port");
2943        let auth =
2944            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2945
2946        let client = Client::new(ClientConfig {
2947            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2948            ..Default::default()
2949        });
2950
2951        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2952        client
2953            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2954            .await
2955            .expect("cannot authenticate against registry for pull operation");
2956
2957        let (manifest, _digest) = client
2958            ._pull_image_manifest(&image)
2959            .await
2960            .expect("failed to pull manifest");
2961
2962        let image_data = client
2963            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2964            .await
2965            .expect("failed to pull image");
2966
2967        for i in 0..=3 {
2968            let push_image: Reference = format!("localhost:{port}/hello-wasm:1.0.{i}")
2969                .parse()
2970                .unwrap();
2971            client
2972                .auth(&push_image, &auth, RegistryOperation::Push)
2973                .await
2974                .expect("authenticated");
2975            client
2976                .push(
2977                    &push_image,
2978                    &image_data.layers,
2979                    image_data.config.clone(),
2980                    &auth,
2981                    Some(manifest.clone()),
2982                )
2983                .await
2984                .expect("Failed to push Image");
2985        }
2986
2987        let image: Reference = format!("localhost:{port}/hello-wasm:1.0.1")
2988            .parse()
2989            .unwrap();
2990        let response = client
2991            .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2992            .await
2993            .expect("Cannot list Tags");
2994        assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2995    }
2996
2997    #[cfg(feature = "test-registry")]
2998    #[tokio::test]
2999    async fn test_catalog() {
3000        let test_container = registry_image_edge()
3001            .start()
3002            .await
3003            .expect("Failed to start registry container");
3004        let port = test_container
3005            .get_host_port_ipv4(5000)
3006            .await
3007            .expect("Failed to get port");
3008        let auth =
3009            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3010
3011        let client = Client::new(ClientConfig {
3012            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3013            ..Default::default()
3014        });
3015
3016        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3017        client
3018            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3019            .await
3020            .expect("cannot authenticate against registry for pull operation");
3021
3022        let (manifest, _digest) = client
3023            ._pull_image_manifest(&image)
3024            .await
3025            .expect("failed to pull manifest");
3026
3027        let image_data = client
3028            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3029            .await
3030            .expect("failed to pull image");
3031
3032        // Push to two different repositories
3033        for repo in &["hello-catalog-a", "hello-catalog-b"] {
3034            let push_image: Reference = format!("localhost:{port}/{repo}:latest").parse().unwrap();
3035            client
3036                .auth(&push_image, &auth, RegistryOperation::Push)
3037                .await
3038                .expect("authenticated");
3039            client
3040                .push(
3041                    &push_image,
3042                    &image_data.layers,
3043                    image_data.config.clone(),
3044                    &auth,
3045                    Some(manifest.clone()),
3046                )
3047                .await
3048                .expect("Failed to push Image");
3049        }
3050
3051        // Use any valid reference for the same registry to call catalog
3052        let catalog_ref: Reference = format!("localhost:{port}/hello-catalog-a:latest")
3053            .parse()
3054            .unwrap();
3055        let response = client
3056            .catalog(&catalog_ref, &RegistryAuth::Anonymous, None, None)
3057            .await
3058            .expect("Cannot list catalog");
3059        assert!(response
3060            .repositories
3061            .contains(&"hello-catalog-a".to_string()));
3062        assert!(response
3063            .repositories
3064            .contains(&"hello-catalog-b".to_string()));
3065
3066        // Test pagination: request 1 result at a time
3067        let page1 = client
3068            .catalog(&catalog_ref, &RegistryAuth::Anonymous, Some(1), None)
3069            .await
3070            .expect("Cannot list catalog page 1");
3071        assert_eq!(page1.repositories.len(), 1);
3072
3073        let page2 = client
3074            .catalog(
3075                &catalog_ref,
3076                &RegistryAuth::Anonymous,
3077                Some(1),
3078                Some(&page1.repositories[0]),
3079            )
3080            .await
3081            .expect("Cannot list catalog page 2");
3082        assert_eq!(page2.repositories.len(), 1);
3083        assert_ne!(page1.repositories[0], page2.repositories[0]);
3084    }
3085
3086    #[tokio::test]
3087    async fn test_pull_manifest_private() {
3088        for &image in TEST_IMAGES {
3089            let reference = Reference::try_from(image).expect("failed to parse reference");
3090            // Currently, pull_manifest does not perform Authz, so this will fail.
3091            let c = Client::default();
3092            c._pull_image_manifest(&reference)
3093                .await
3094                .expect_err("pull manifest should fail");
3095
3096            // But this should pass
3097            let c = Client::default();
3098            c.auth(
3099                &reference,
3100                &RegistryAuth::Anonymous,
3101                RegistryOperation::Pull,
3102            )
3103            .await
3104            .expect("authenticated");
3105            let (manifest, _) = c
3106                ._pull_image_manifest(&reference)
3107                .await
3108                .expect("pull manifest should not fail");
3109
3110            // The test on the manifest checks all fields. This is just a brief sanity check.
3111            assert_eq!(manifest.schema_version, 2);
3112            assert!(!manifest.layers.is_empty());
3113        }
3114    }
3115
3116    #[tokio::test]
3117    async fn test_pull_manifest_public() {
3118        for &image in TEST_IMAGES {
3119            let reference = Reference::try_from(image).expect("failed to parse reference");
3120            let c = Client::default();
3121            let (manifest, _) = c
3122                .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3123                .await
3124                .expect("pull manifest should not fail");
3125
3126            // The test on the manifest checks all fields. This is just a brief sanity check.
3127            assert_eq!(manifest.schema_version, 2);
3128            assert!(!manifest.layers.is_empty());
3129        }
3130    }
3131
3132    #[tokio::test]
3133    async fn pull_manifest_and_config_public() {
3134        for &image in TEST_IMAGES {
3135            let reference = Reference::try_from(image).expect("failed to parse reference");
3136            let c = Client::default();
3137            let (manifest, _, config) = c
3138                .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
3139                .await
3140                .expect("pull manifest and config should not fail");
3141
3142            // The test on the manifest checks all fields. This is just a brief sanity check.
3143            assert_eq!(manifest.schema_version, 2);
3144            assert!(!manifest.layers.is_empty());
3145            assert!(!config.is_empty());
3146        }
3147    }
3148
3149    #[tokio::test]
3150    async fn test_fetch_digest() {
3151        let c = Client::default();
3152
3153        for &image in TEST_IMAGES {
3154            let reference = Reference::try_from(image).expect("failed to parse reference");
3155            c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
3156                .await
3157                .expect("pull manifest should not fail");
3158
3159            // This should pass
3160            let reference = Reference::try_from(image).expect("failed to parse reference");
3161            let c = Client::default();
3162            c.auth(
3163                &reference,
3164                &RegistryAuth::Anonymous,
3165                RegistryOperation::Pull,
3166            )
3167            .await
3168            .expect("authenticated");
3169            let digest = c
3170                .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
3171                .await
3172                .expect("pull manifest should not fail");
3173
3174            assert_eq!(
3175                digest,
3176                "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
3177            );
3178        }
3179    }
3180
3181    #[tokio::test]
3182    async fn test_pull_blob() {
3183        let c = Client::default();
3184
3185        for &image in TEST_IMAGES {
3186            let reference = Reference::try_from(image).expect("failed to parse reference");
3187            c.auth(
3188                &reference,
3189                &RegistryAuth::Anonymous,
3190                RegistryOperation::Pull,
3191            )
3192            .await
3193            .expect("authenticated");
3194            let (manifest, _) = c
3195                ._pull_image_manifest(&reference)
3196                .await
3197                .expect("failed to pull manifest");
3198
3199            // Pull one specific layer
3200            let mut file: Vec<u8> = Vec::new();
3201            let layer0 = &manifest.layers[0];
3202
3203            // This call likes to flake, so we try it at least 5 times
3204            let mut last_error = None;
3205            for i in 1..6 {
3206                if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
3207                    println!("Got error on pull_blob call attempt {i}. Will retry in 1s: {e:?}");
3208                    last_error.replace(e);
3209                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
3210                } else {
3211                    last_error = None;
3212                    break;
3213                }
3214            }
3215
3216            if let Some(e) = last_error {
3217                panic!("Unable to pull layer: {e:?}");
3218            }
3219
3220            // The manifest says how many bytes we should expect.
3221            assert_eq!(file.len(), layer0.size as usize);
3222        }
3223    }
3224
3225    #[tokio::test]
3226    async fn test_pull_blob_stream() {
3227        let c = Client::default();
3228
3229        for &image in TEST_IMAGES {
3230            let reference = Reference::try_from(image).expect("failed to parse reference");
3231            c.auth(
3232                &reference,
3233                &RegistryAuth::Anonymous,
3234                RegistryOperation::Pull,
3235            )
3236            .await
3237            .expect("authenticated");
3238            let (manifest, _) = c
3239                ._pull_image_manifest(&reference)
3240                .await
3241                .expect("failed to pull manifest");
3242
3243            // Pull one specific layer
3244            let mut file: Vec<u8> = Vec::new();
3245            let layer0 = &manifest.layers[0];
3246
3247            let layer_stream = c
3248                .pull_blob_stream(&reference, layer0)
3249                .await
3250                .expect("failed to pull blob stream");
3251
3252            assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
3253            AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
3254                .await
3255                .unwrap();
3256
3257            // The manifest says how many bytes we should expect.
3258            assert_eq!(file.len(), layer0.size as usize);
3259        }
3260    }
3261
3262    #[tokio::test]
3263    async fn test_pull_blob_stream_partial() {
3264        let c = Client::default();
3265
3266        for &image in TEST_IMAGES {
3267            let reference = Reference::try_from(image).expect("failed to parse reference");
3268            c.auth(
3269                &reference,
3270                &RegistryAuth::Anonymous,
3271                RegistryOperation::Pull,
3272            )
3273            .await
3274            .expect("authenticated");
3275            let (manifest, _) = c
3276                ._pull_image_manifest(&reference)
3277                .await
3278                .expect("failed to pull manifest");
3279
3280            // Pull part of one specific layer
3281            let mut partial_file: Vec<u8> = Vec::new();
3282            let layer0 = &manifest.layers[0];
3283            let (offset, length) = (10, 6);
3284
3285            let partial_response = c
3286                .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
3287                .await
3288                .expect("failed to pull blob stream");
3289            let full_response = c
3290                .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
3291                .await
3292                .expect("failed to pull blob stream");
3293
3294            let layer_stream_partial = match partial_response {
3295                BlobResponse::Full(_stream) => panic!("expected partial response"),
3296                BlobResponse::Partial(stream) => stream,
3297            };
3298            assert_eq!(layer_stream_partial.content_length, Some(length));
3299            AsyncReadExt::read_to_end(
3300                &mut StreamReader::new(layer_stream_partial.stream),
3301                &mut partial_file,
3302            )
3303            .await
3304            .unwrap();
3305
3306            // Also pull the full layer into a separate file to compare with the partial.
3307            let mut full_file: Vec<u8> = Vec::new();
3308            let layer_stream_full = match full_response {
3309                BlobResponse::Full(_stream) => panic!("expected partial response"),
3310                BlobResponse::Partial(stream) => stream,
3311            };
3312            assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
3313            AsyncReadExt::read_to_end(
3314                &mut StreamReader::new(layer_stream_full.stream),
3315                &mut full_file,
3316            )
3317            .await
3318            .unwrap();
3319
3320            // The partial read length says how many bytes we should expect.
3321            assert_eq!(partial_file.len(), length as usize);
3322            // The manifest says how many bytes we should expect on a full read.
3323            assert_eq!(full_file.len(), layer0.size as usize);
3324            // Check that the partial read retrieved the correct bytes.
3325            let end: usize = (offset + length) as usize;
3326            assert_eq!(partial_file, full_file[offset as usize..end]);
3327        }
3328    }
3329
3330    #[tokio::test]
3331    async fn test_pull() {
3332        for &image in TEST_IMAGES {
3333            let reference = Reference::try_from(image).expect("failed to parse reference");
3334
3335            // This call likes to flake, so we try it at least 5 times
3336            let mut last_error = None;
3337            let mut image_data = None;
3338            for i in 1..6 {
3339                match Client::default()
3340                    .pull(
3341                        &reference,
3342                        &RegistryAuth::Anonymous,
3343                        vec![manifest::WASM_LAYER_MEDIA_TYPE],
3344                    )
3345                    .await
3346                {
3347                    Ok(data) => {
3348                        image_data = Some(data);
3349                        last_error = None;
3350                        break;
3351                    }
3352                    Err(e) => {
3353                        println!("Got error on pull call attempt {i}. Will retry in 1s: {e:?}");
3354                        last_error.replace(e);
3355                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
3356                    }
3357                }
3358            }
3359
3360            if let Some(e) = last_error {
3361                panic!("Unable to pull layer: {e:?}");
3362            }
3363
3364            assert!(image_data.is_some());
3365            let image_data = image_data.unwrap();
3366            assert!(!image_data.layers.is_empty());
3367            assert!(image_data.digest.is_some());
3368        }
3369    }
3370
3371    /// Attempting to pull an image without any layer validation should fail.
3372    #[tokio::test]
3373    async fn test_pull_without_layer_validation() {
3374        for &image in TEST_IMAGES {
3375            let reference = Reference::try_from(image).expect("failed to parse reference");
3376            assert!(Client::default()
3377                .pull(&reference, &RegistryAuth::Anonymous, vec![],)
3378                .await
3379                .is_err());
3380        }
3381    }
3382
3383    /// Attempting to pull an image with the wrong list of layer validations should fail.
3384    #[tokio::test]
3385    async fn test_pull_wrong_layer_validation() {
3386        for &image in TEST_IMAGES {
3387            let reference = Reference::try_from(image).expect("failed to parse reference");
3388            assert!(Client::default()
3389                .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
3390                .await
3391                .is_err());
3392        }
3393    }
3394
3395    // This is the latest build of distribution/distribution from the `main` branch
3396    // Until distribution v3 is relased, this is the only way to have this fix
3397    // https://github.com/distribution/distribution/pull/3143
3398    //
3399    // We require this fix only when testing the capability to list tags
3400    #[cfg(feature = "test-registry")]
3401    fn registry_image_edge() -> GenericImage {
3402        GenericImage::new("distribution/distribution", "edge")
3403            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3404    }
3405
3406    #[cfg(feature = "test-registry")]
3407    fn registry_image() -> GenericImage {
3408        GenericImage::new("docker.io/library/registry", "2")
3409            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3410    }
3411
3412    #[cfg(feature = "test-registry")]
3413    fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
3414        GenericImage::new("docker.io/library/registry", "2")
3415            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3416            .with_env_var("REGISTRY_AUTH", "htpasswd")
3417            .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
3418            .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
3419            .with_mount(Mount::bind_mount(auth_path, "/auth"))
3420    }
3421
3422    #[tokio::test]
3423    #[cfg(feature = "test-registry")]
3424    async fn can_push_chunk() {
3425        let test_container = registry_image()
3426            .start()
3427            .await
3428            .expect("Failed to start registry container");
3429        let port = test_container
3430            .get_host_port_ipv4(5000)
3431            .await
3432            .expect("Failed to get port");
3433
3434        let c = Client::new(ClientConfig {
3435            protocol: ClientProtocol::Http,
3436            ..Default::default()
3437        });
3438        let url = format!("localhost:{port}/hello-wasm:v1");
3439        let image: Reference = url.parse().unwrap();
3440
3441        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3442            .await
3443            .expect("result from auth request");
3444
3445        let location = c
3446            .begin_push_chunked_session(&image)
3447            .await
3448            .expect("failed to begin push session");
3449
3450        let image_data = Bytes::from(b"iamawebassemblymodule".to_vec());
3451        let (next_location, next_byte) = c
3452            .push_chunk(&location, &image, image_data.clone(), 0)
3453            .await
3454            .expect("failed to push layer");
3455
3456        // Location should include original URL with at session ID appended
3457        assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3458        assert_eq!(next_byte, image_data.len());
3459
3460        let layer_location = c
3461            .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data))
3462            .await
3463            .expect("failed to end push session");
3464
3465        assert_eq!(layer_location, format!("http://localhost:{port}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b"));
3466    }
3467
3468    #[tokio::test]
3469    #[cfg(feature = "test-registry")]
3470    async fn can_push_multiple_chunks() {
3471        let test_container = registry_image()
3472            .start()
3473            .await
3474            .expect("Failed to start registry container");
3475        let port = test_container
3476            .get_host_port_ipv4(5000)
3477            .await
3478            .expect("Failed to get port");
3479
3480        let mut c = Client::new(ClientConfig {
3481            protocol: ClientProtocol::Http,
3482            ..Default::default()
3483        });
3484        // set a super small chunk size - done to force multiple pushes
3485        c.push_chunk_size = 3;
3486        let url = format!("localhost:{port}/hello-wasm:v1");
3487        let image: Reference = url.parse().unwrap();
3488
3489        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3490            .await
3491            .expect("result from auth request");
3492
3493        let image_data: Vec<u8> =
3494            b"i am a big webassembly mode that needs chunked uploads".to_vec();
3495        let image_digest = sha256_digest(&image_data);
3496
3497        let location = c
3498            .push_blob_chunked(&image, image_data, &image_digest)
3499            .await
3500            .expect("failed to begin push session");
3501
3502        assert_eq!(
3503            location,
3504            format!("http://localhost:{port}/v2/hello-wasm/blobs/{image_digest}")
3505        );
3506    }
3507
3508    #[tokio::test]
3509    #[cfg(feature = "test-registry")]
3510    async fn test_image_roundtrip_anon_auth() {
3511        let test_container = registry_image()
3512            .start()
3513            .await
3514            .expect("Failed to start registry container");
3515
3516        test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3517    }
3518
3519    #[tokio::test]
3520    #[cfg(feature = "test-registry")]
3521    async fn test_image_roundtrip_basic_auth() {
3522        let auth_dir = TempDir::new().expect("cannot create tmp directory");
3523        let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3524        fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3525
3526        let image = registry_image_basic_auth(
3527            auth_dir
3528                .path()
3529                .to_str()
3530                .expect("cannot convert htpasswd_path to string"),
3531        );
3532        let test_container = image.start().await.expect("cannot registry container");
3533
3534        let auth =
3535            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3536
3537        test_image_roundtrip(&auth, &test_container).await;
3538    }
3539
3540    #[cfg(feature = "test-registry")]
3541    async fn test_image_roundtrip(
3542        registry_auth: &RegistryAuth,
3543        test_container: &testcontainers::ContainerAsync<GenericImage>,
3544    ) {
3545        let _ = tracing_subscriber::fmt::try_init();
3546        let port = test_container
3547            .get_host_port_ipv4(5000)
3548            .await
3549            .expect("Failed to get port");
3550
3551        let c = Client::new(ClientConfig {
3552            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3553            ..Default::default()
3554        });
3555
3556        // pulling webassembly.azurecr.io/hello-wasm:v1
3557        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3558        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3559            .await
3560            .expect("cannot authenticate against registry for pull operation");
3561
3562        let (manifest, _digest) = c
3563            ._pull_image_manifest(&image)
3564            .await
3565            .expect("failed to pull manifest");
3566
3567        let image_data = c
3568            .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3569            .await
3570            .expect("failed to pull image");
3571
3572        let push_image: Reference = format!("localhost:{port}/hello-wasm:v1").parse().unwrap();
3573        c.auth(&push_image, registry_auth, RegistryOperation::Push)
3574            .await
3575            .expect("authenticated");
3576
3577        c.push(
3578            &push_image,
3579            &image_data.layers,
3580            image_data.config.clone(),
3581            registry_auth,
3582            Some(manifest.clone()),
3583        )
3584        .await
3585        .expect("failed to push image");
3586
3587        let pulled_image_data = c
3588            .pull(
3589                &push_image,
3590                registry_auth,
3591                vec![manifest::WASM_LAYER_MEDIA_TYPE],
3592            )
3593            .await
3594            .expect("failed to pull pushed image");
3595
3596        let (pulled_manifest, _digest) = c
3597            ._pull_image_manifest(&push_image)
3598            .await
3599            .expect("failed to pull pushed image manifest");
3600
3601        assert!(image_data.layers.len() == 1);
3602        assert!(pulled_image_data.layers.len() == 1);
3603        assert_eq!(
3604            image_data.layers[0].data.len(),
3605            pulled_image_data.layers[0].data.len()
3606        );
3607        assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3608
3609        assert_eq!(manifest.media_type, pulled_manifest.media_type);
3610        assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3611        assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3612    }
3613
3614    #[tokio::test]
3615    async fn test_raw_manifest_digest() {
3616        let _ = tracing_subscriber::fmt::try_init();
3617
3618        let c = Client::default();
3619
3620        // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7
3621        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3622        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3623            .await
3624            .expect("cannot authenticate against registry for pull operation");
3625
3626        let (manifest, _) = c
3627            .pull_manifest_raw(
3628                &image,
3629                &RegistryAuth::Anonymous,
3630                MIME_TYPES_DISTRIBUTION_MANIFEST,
3631            )
3632            .await
3633            .expect("failed to pull manifest");
3634
3635        // Compute the digest of the returned manifest text.
3636        let digest = sha2::Sha256::digest(manifest);
3637        let hex = format!("sha256:{}", hex::encode(digest));
3638
3639        // Validate that the computed digest and the digest in the pulled reference match.
3640        assert_eq!(image.digest().unwrap(), hex);
3641    }
3642
3643    #[tokio::test]
3644    #[cfg(feature = "test-registry")]
3645    async fn test_mount() {
3646        // initialize the registry
3647        let test_container = registry_image()
3648            .start()
3649            .await
3650            .expect("Failed to start registry");
3651        let port = test_container
3652            .get_host_port_ipv4(5000)
3653            .await
3654            .expect("Failed to get port");
3655
3656        let c = Client::new(ClientConfig {
3657            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3658            ..Default::default()
3659        });
3660
3661        // Create a dummy layer and push it to `layer-repository`
3662        let layer_reference: Reference = format!("localhost:{port}/layer-repository")
3663            .parse()
3664            .unwrap();
3665        let layer_data = vec![1u8, 2, 3, 4];
3666        let layer = OciDescriptor {
3667            digest: sha256_digest(&layer_data),
3668            ..Default::default()
3669        };
3670        c.push_blob(
3671            &layer_reference,
3672            Bytes::copy_from_slice(&layer_data),
3673            &layer.digest,
3674        )
3675        .await
3676        .expect("Failed to push");
3677
3678        // Mount the layer at `image-repository`
3679        let image_reference: Reference = format!("localhost:{port}/image-repository")
3680            .parse()
3681            .unwrap();
3682        c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3683            .await
3684            .expect("Failed to mount");
3685
3686        // Pull the layer from `image-repository`
3687        let mut buf = Vec::new();
3688        c.pull_blob(&image_reference, &layer, &mut buf)
3689            .await
3690            .expect("Failed to pull");
3691
3692        assert_eq!(layer_data, buf);
3693    }
3694
3695    #[tokio::test]
3696    async fn test_platform_resolution() {
3697        // test that we get an error when we pull a manifest list
3698        let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3699        let mut c = Client::new(ClientConfig {
3700            platform_resolver: None,
3701            ..Default::default()
3702        });
3703        let err = c
3704            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3705            .await
3706            .unwrap_err();
3707        assert_eq!(
3708            format!("{err}"),
3709            "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3710        );
3711
3712        c = Client::new(ClientConfig {
3713            platform_resolver: Some(Box::new(linux_amd64_resolver)),
3714            ..Default::default()
3715        });
3716        let (_manifest, digest) = c
3717            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3718            .await
3719            .expect("Couldn't pull manifest");
3720        assert_eq!(
3721            digest,
3722            "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3723        );
3724    }
3725
3726    #[tokio::test]
3727    async fn test_pull_ghcr_io() {
3728        let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3729        let c = Client::default();
3730        let (manifest, _manifest_str) = c
3731            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3732            .await
3733            .unwrap();
3734        assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3735    }
3736
3737    #[tokio::test]
3738    #[ignore]
3739    async fn test_roundtrip_multiple_layers() {
3740        let _ = tracing_subscriber::fmt::try_init();
3741        let c = Client::new(ClientConfig {
3742            protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3743            ..Default::default()
3744        });
3745        let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3746        let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3747            .expect("failed to parse reference");
3748
3749        let image = c
3750            .pull(
3751                &src_image,
3752                &RegistryAuth::Anonymous,
3753                vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3754            )
3755            .await
3756            .expect("Failed to pull manifest");
3757        assert!(image.layers.len() > 1);
3758
3759        let ImageData {
3760            layers,
3761            config,
3762            manifest,
3763            ..
3764        } = image;
3765        c.push(
3766            &dest_image,
3767            &layers,
3768            config,
3769            &RegistryAuth::Anonymous,
3770            manifest,
3771        )
3772        .await
3773        .expect("Failed to pull manifest");
3774
3775        c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3776            .await
3777            .expect("Failed to pull manifest");
3778    }
3779
3780    #[tokio::test]
3781    async fn test_hashable_image_layer() {
3782        use itertools::Itertools;
3783
3784        // First two should be identical; others differ
3785        let image_layers = Vec::from([
3786            ImageLayer {
3787                data: Bytes::from_static(&[0, 1, 2, 3]),
3788                media_type: "media_type".to_owned(),
3789                annotations: Some(BTreeMap::from([
3790                    ("0".to_owned(), "1".to_owned()),
3791                    ("2".to_owned(), "3".to_owned()),
3792                ])),
3793            },
3794            ImageLayer {
3795                data: Bytes::from_static(&[0, 1, 2, 3]),
3796                media_type: "media_type".to_owned(),
3797                annotations: Some(BTreeMap::from([
3798                    ("2".to_owned(), "3".to_owned()),
3799                    ("0".to_owned(), "1".to_owned()),
3800                ])),
3801            },
3802            ImageLayer {
3803                data: Bytes::from_static(&[0, 1, 2, 3]),
3804                media_type: "different_media_type".to_owned(),
3805                annotations: Some(BTreeMap::from([
3806                    ("0".to_owned(), "1".to_owned()),
3807                    ("2".to_owned(), "3".to_owned()),
3808                ])),
3809            },
3810            ImageLayer {
3811                data: Bytes::from_static(&[0, 1, 2]),
3812                media_type: "media_type".to_owned(),
3813                annotations: Some(BTreeMap::from([
3814                    ("0".to_owned(), "1".to_owned()),
3815                    ("2".to_owned(), "3".to_owned()),
3816                ])),
3817            },
3818            ImageLayer {
3819                data: Bytes::from_static(&[0, 1, 2, 3]),
3820                media_type: "media_type".to_owned(),
3821                annotations: Some(BTreeMap::from([
3822                    ("1".to_owned(), "0".to_owned()),
3823                    ("2".to_owned(), "3".to_owned()),
3824                ])),
3825            },
3826        ]);
3827
3828        assert_eq!(
3829            &image_layers[0], &image_layers[1],
3830            "image_layers[0] should equal image_layers[1]"
3831        );
3832        assert_ne!(
3833            &image_layers[0], &image_layers[2],
3834            "image_layers[0] should not equal image_layers[2]"
3835        );
3836        assert_ne!(
3837            &image_layers[0], &image_layers[3],
3838            "image_layers[0] should not equal image_layers[3]"
3839        );
3840        assert_ne!(
3841            &image_layers[0], &image_layers[4],
3842            "image_layers[0] should not equal image_layers[4]"
3843        );
3844        assert_ne!(
3845            &image_layers[2], &image_layers[3],
3846            "image_layers[2] should not equal image_layers[3]"
3847        );
3848        assert_ne!(
3849            &image_layers[2], &image_layers[4],
3850            "image_layers[2] should not equal image_layers[4]"
3851        );
3852        assert_ne!(
3853            &image_layers[3], &image_layers[4],
3854            "image_layers[3] should not equal image_layers[4]"
3855        );
3856
3857        let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3858        assert_eq!(
3859            image_layers.len() - 1,
3860            deduped.len(),
3861            "after deduplication, there should be one less image layer"
3862        );
3863    }
3864
3865    #[tokio::test]
3866    #[cfg(feature = "test-registry")]
3867    async fn test_blob_exists() {
3868        let real_registry = registry_image_edge()
3869            .start()
3870            .await
3871            .expect("Failed to start registry container");
3872
3873        let server_port = real_registry
3874            .get_host_port_ipv4(5000)
3875            .await
3876            .expect("Failed to get port");
3877
3878        let client = Client::new(ClientConfig {
3879            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3880            ..Default::default()
3881        });
3882
3883        let reference = Reference::try_from(format!("localhost:{server_port}/empty"))
3884            .expect("failed to parse reference");
3885
3886        assert!(!client
3887            .blob_exists(&reference, EMPTY_JSON_DIGEST)
3888            .await
3889            .expect("failed to check blob existence"));
3890        client
3891            .push_blob(&reference, EMPTY_JSON_BLOB.as_bytes(), EMPTY_JSON_DIGEST)
3892            .await
3893            .expect("failed to push empty json blob");
3894        assert!(client
3895            .blob_exists(&reference, EMPTY_JSON_DIGEST)
3896            .await
3897            .expect("failed to check blob existence"));
3898    }
3899
3900    #[tokio::test]
3901    #[cfg(feature = "test-registry")]
3902    async fn test_push_stream() {
3903        let real_registry = registry_image_edge()
3904            .start()
3905            .await
3906            .expect("Failed to start registry container");
3907
3908        let server_port = real_registry
3909            .get_host_port_ipv4(5000)
3910            .await
3911            .expect("Failed to get port");
3912
3913        let mut client = Client::new(ClientConfig {
3914            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3915            ..Default::default()
3916        });
3917        client.push_chunk_size = 253;
3918
3919        // hash for a byte array counting 16 times from 0 to 255 ([0, 1. 2...., 255] * 16)
3920        let data_hash = "sha256:c8f5d0341d54d951a71b136e6e2afcb14d11ed8489a7ae126a8fee0df6ecf193";
3921        let data_stream = |repeat| {
3922            futures_util::stream::repeat(Bytes::from_iter(0..=255))
3923                .take(repeat)
3924                .map(Ok)
3925        };
3926
3927        let reference = Reference::try_from(format!("localhost:{server_port}/test-push-stream"))
3928            .expect("failed to parse reference");
3929
3930        // Sanity check: verify that the server rejects the push if the blob has a mismatched digest
3931        client
3932            .push_blob_stream(&reference, data_stream(1), data_hash)
3933            .await
3934            .expect_err("expected push to fail with mismatched digest");
3935
3936        // Now push the stream with the correct digest
3937        client
3938            .push_blob_stream(&reference, data_stream(16), data_hash)
3939            .await
3940            .expect("failed to push stream");
3941
3942        assert!(client
3943            .blob_exists(&reference, data_hash)
3944            .await
3945            .expect("failed to check blob existence"));
3946    }
3947
3948    /// Push a minimal OCI image manifest (empty config blob, no layers) to the registry and
3949    /// return its digest.
3950    ///
3951    /// The manifest is pushed under the given `reference`.  The caller is responsible for
3952    /// authenticating the client for push operations beforehand.
3953    #[cfg(feature = "test-registry")]
3954    async fn push_minimal_manifest(
3955        client: &Client,
3956        reference: &Reference,
3957        artifact_type: Option<&str>,
3958    ) -> String {
3959        // Empty config blob.
3960        let config_data = b"{}";
3961        let config_digest = sha256_digest(config_data);
3962        client
3963            .push_blob(reference, config_data.as_slice(), &config_digest)
3964            .await
3965            .expect("failed to push config blob");
3966
3967        let manifest = OciImageManifest {
3968            schema_version: 2,
3969            media_type: Some(manifest::OCI_IMAGE_MEDIA_TYPE.to_string()),
3970            artifact_type: artifact_type.map(str::to_string),
3971            config: OciDescriptor {
3972                media_type: manifest::IMAGE_CONFIG_MEDIA_TYPE.to_string(),
3973                digest: config_digest.clone(),
3974                size: config_data.len() as i64,
3975                ..Default::default()
3976            },
3977            layers: vec![],
3978            subject: None,
3979            annotations: None,
3980        };
3981
3982        let oci_manifest = OciManifest::Image(manifest);
3983        client
3984            .push_manifest(reference, &oci_manifest)
3985            .await
3986            .expect("failed to push manifest")
3987            // push_manifest returns the URL; extract the digest from the end
3988            .rsplit('/')
3989            .next()
3990            .expect("manifest URL has no digest component")
3991            .to_string()
3992    }
3993
3994    /// `distribution/distribution` does not implement the native OCI referrers API — it returns 404 for
3995    /// `/v2/<name>/referrers/<digest>`.  These tests verify that `pull_referrers` correctly
3996    /// falls back to the referrers tag schema in that situation.
3997    ///
3998    /// Referrers support is being tracked upstream by this issue: https://github.com/distribution/distribution/issues/3716
3999    ///
4000    /// Setup overview:
4001    ///   1. Push a "target" image manifest to get its digest.
4002    ///   2. Manually build and push an `OciImageIndex` as the referrers tag
4003    ///      (`sha256-<target-digest>`), containing descriptor entries for two
4004    ///      hypothetical referrers with different `artifact_type` values.
4005    ///   3. Call `pull_referrers` and verify that the fallback is used and the
4006    ///      returned index contains the expected entries (both unfiltered and filtered).
4007    #[tokio::test]
4008    #[cfg(feature = "test-registry")]
4009    async fn test_pull_referrers_with_tag_schema_fallback() {
4010        let test_container = registry_image()
4011            .start()
4012            .await
4013            .expect("Failed to start registry container");
4014        let port = test_container
4015            .get_host_port_ipv4(5000)
4016            .await
4017            .expect("Failed to get port");
4018
4019        let client = Client::new(ClientConfig {
4020            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{port}")]),
4021            ..Default::default()
4022        });
4023
4024        let repo = format!("localhost:{port}/referrers-test");
4025
4026        // --- Step 1: push the target manifest ---
4027        let target_ref: Reference = format!("{repo}:target").parse().unwrap();
4028        client
4029            .auth(
4030                &target_ref,
4031                &RegistryAuth::Anonymous,
4032                RegistryOperation::Push,
4033            )
4034            .await
4035            .expect("failed to authenticate for push");
4036        let target_digest = push_minimal_manifest(&client, &target_ref, None).await;
4037
4038        // --- Step 2: push a referrers tag index ---
4039        //
4040        // The tag is the target digest with ':' replaced by '-'.
4041        // We include two descriptors so we can also test artifact_type filtering:
4042        //   - one with artifact_type "application/vnd.test.sig"
4043        //   - one with artifact_type "application/vnd.test.sbom"
4044        const SIG_ARTIFACT_TYPE: &str = "application/vnd.test.sig";
4045        const SBOM_ARTIFACT_TYPE: &str = "application/vnd.test.sbom";
4046
4047        // Push two real minimal manifests to use as referrer entries.
4048        let sig_ref: Reference = format!("{repo}:sig").parse().unwrap();
4049        let sig_digest = push_minimal_manifest(&client, &sig_ref, Some(SIG_ARTIFACT_TYPE)).await;
4050
4051        let sbom_ref: Reference = format!("{repo}:sbom").parse().unwrap();
4052        let sbom_digest = push_minimal_manifest(&client, &sbom_ref, Some(SBOM_ARTIFACT_TYPE)).await;
4053
4054        // Pull the manifests back to get the accurate serialised sizes.
4055        let (sig_raw, _) = client
4056            .pull_manifest_raw(
4057                &sig_ref,
4058                &RegistryAuth::Anonymous,
4059                MIME_TYPES_DISTRIBUTION_MANIFEST,
4060            )
4061            .await
4062            .expect("failed to pull sig manifest raw");
4063        let sig_size = sig_raw.len() as i64;
4064
4065        let (sbom_raw, _) = client
4066            .pull_manifest_raw(
4067                &sbom_ref,
4068                &RegistryAuth::Anonymous,
4069                MIME_TYPES_DISTRIBUTION_MANIFEST,
4070            )
4071            .await
4072            .expect("failed to pull sbom manifest raw");
4073        let sbom_size = sbom_raw.len() as i64;
4074
4075        let referrers_index = OciImageIndex {
4076            schema_version: 2,
4077            media_type: Some(manifest::OCI_IMAGE_INDEX_MEDIA_TYPE.to_string()),
4078            artifact_type: None,
4079            annotations: None,
4080            manifests: vec![
4081                ImageIndexEntry {
4082                    media_type: manifest::OCI_IMAGE_MEDIA_TYPE.to_string(),
4083                    digest: sig_digest,
4084                    size: sig_size,
4085                    artifact_type: Some(SIG_ARTIFACT_TYPE.to_string()),
4086                    platform: None,
4087                    annotations: None,
4088                },
4089                ImageIndexEntry {
4090                    media_type: manifest::OCI_IMAGE_MEDIA_TYPE.to_string(),
4091                    digest: sbom_digest,
4092                    size: sbom_size,
4093                    artifact_type: Some(SBOM_ARTIFACT_TYPE.to_string()),
4094                    platform: None,
4095                    annotations: None,
4096                },
4097            ],
4098        };
4099
4100        let fallback_tag = target_digest.replace(':', "-");
4101        let tag_ref: Reference = format!("{repo}:{fallback_tag}").parse().unwrap();
4102        client
4103            .push_manifest(&tag_ref, &OciManifest::ImageIndex(referrers_index))
4104            .await
4105            .expect("failed to push referrers tag index");
4106
4107        // --- Step 3: pull_referrers — no filter, expect both entries ---
4108        let digest_ref = Reference::with_digest(
4109            format!("localhost:{port}"),
4110            "referrers-test".to_string(),
4111            target_digest.clone(),
4112        );
4113        client
4114            .auth(
4115                &digest_ref,
4116                &RegistryAuth::Anonymous,
4117                RegistryOperation::Pull,
4118            )
4119            .await
4120            .expect("failed to authenticate for pull");
4121
4122        let index = client
4123            .pull_referrers(&digest_ref, None)
4124            .await
4125            .expect("pull_referrers failed");
4126        assert_eq!(
4127            index.manifests.len(),
4128            2,
4129            "expected 2 referrers (unfiltered), got {:?}",
4130            index.manifests
4131        );
4132
4133        // --- Step 4: pull_referrers — filtered by SIG_ARTIFACT_TYPE ---
4134        let index_filtered = client
4135            .pull_referrers(&digest_ref, Some(SIG_ARTIFACT_TYPE))
4136            .await
4137            .expect("pull_referrers with artifact_type filter failed");
4138        assert_eq!(
4139            index_filtered.manifests.len(),
4140            1,
4141            "expected 1 referrer after filtering by {SIG_ARTIFACT_TYPE}, got {:?}",
4142            index_filtered.manifests
4143        );
4144        assert_eq!(
4145            index_filtered.manifests[0].artifact_type.as_deref(),
4146            Some(SIG_ARTIFACT_TYPE),
4147        );
4148    }
4149
4150    /// Verify that `pull_referrers` returns an empty index when neither the native referrers
4151    /// API nor the referrers tag schema returns anything — i.e. the target image exists but
4152    /// has no referrers at all.
4153    #[tokio::test]
4154    #[cfg(feature = "test-registry")]
4155    async fn test_pull_referrers_no_tag_schema() {
4156        let test_container = registry_image()
4157            .start()
4158            .await
4159            .expect("Failed to start registry container");
4160        let port = test_container
4161            .get_host_port_ipv4(5000)
4162            .await
4163            .expect("Failed to get port");
4164
4165        let client = Client::new(ClientConfig {
4166            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{port}")]),
4167            ..Default::default()
4168        });
4169
4170        let repo = format!("localhost:{port}/referrers-none-test");
4171
4172        // Push a target manifest — but do NOT push any referrers tag.
4173        let target_ref: Reference = format!("{repo}:target").parse().unwrap();
4174        client
4175            .auth(
4176                &target_ref,
4177                &RegistryAuth::Anonymous,
4178                RegistryOperation::Push,
4179            )
4180            .await
4181            .expect("failed to authenticate for push");
4182        let target_digest = push_minimal_manifest(&client, &target_ref, None).await;
4183
4184        let digest_ref = Reference::with_digest(
4185            format!("localhost:{port}"),
4186            "referrers-none-test".to_string(),
4187            target_digest,
4188        );
4189        client
4190            .auth(
4191                &digest_ref,
4192                &RegistryAuth::Anonymous,
4193                RegistryOperation::Pull,
4194            )
4195            .await
4196            .expect("failed to authenticate for pull");
4197
4198        let index = client
4199            .pull_referrers(&digest_ref, None)
4200            .await
4201            .expect("pull_referrers should succeed (returning empty index)");
4202        assert!(
4203            index.manifests.is_empty(),
4204            "expected empty referrers index, got {:?}",
4205            index.manifests
4206        );
4207    }
4208}