1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
mod config;
mod label;
mod local;
mod meta;
mod oci;
mod package;
mod release;
mod toml;

use std::collections::HashMap;

use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::BoxStream;
use local::LocalSource;
use oci::{OciConfig, OciSource};
use oci_distribution::errors::OciDistributionError;
pub use semver::Version;

/// Re-exported to ease configuration.
pub use oci_distribution::client as oci_client;

pub use crate::{
    config::ClientConfig,
    package::PackageRef,
    release::{ContentDigest, Release},
};
use crate::{
    label::{InvalidLabel, Label},
    meta::RegistryMeta,
};

/// A read-only registry client.
pub struct Client {
    config: ClientConfig,
    sources: HashMap<String, Box<dyn PackageSource>>,
}

#[async_trait]
trait PackageSource {
    async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<Version>, Error>;

    async fn get_release(
        &mut self,
        package: &PackageRef,
        version: &Version,
    ) -> Result<Release, Error>;

    async fn stream_content(
        &mut self,
        package: &PackageRef,
        release: &Release,
    ) -> Result<BoxStream<Result<Bytes, Error>>, Error>;
}

impl Client {
    /// Returns a new client with the given [`ClientConfig`].
    pub fn new(config: ClientConfig) -> Self {
        Self {
            config,
            sources: Default::default(),
        }
    }

    /// Returns a new client configured from the default config file path.
    /// Returns Ok(None) if the default config file does not exist.
    pub fn from_default_config_file() -> Result<Option<Self>, Error> {
        Ok(ClientConfig::from_default_file()?.map(Self::new))
    }

    /// Returns a list of all package [`Version`]s available for the given package.
    pub async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<Version>, Error> {
        let source = self.resolve_source(package).await?;
        source.list_all_versions(package).await
    }

    /// Returns a [`Release`] for the given package version.
    pub async fn get_release(
        &mut self,
        package: &PackageRef,
        version: &Version,
    ) -> Result<Release, Error> {
        let source = self.resolve_source(package).await?;
        source.get_release(package, version).await
    }

    /// Returns a [`BoxStream`] of content chunks.
    pub async fn stream_content(
        &mut self,
        package: &PackageRef,
        release: &Release,
    ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
        let source = self.resolve_source(package).await?;
        source.stream_content(package, release).await
    }

    async fn resolve_source(
        &mut self,
        package: &PackageRef,
    ) -> Result<&mut dyn PackageSource, Error> {
        let registry = self.config.resolve_package_registry(package)?.to_owned();
        if !self.sources.contains_key(&registry) {
            let registry_config = self
                .config
                .registry_configs
                .get(&registry)
                .cloned()
                .unwrap_or_default();

            tracing::debug!("Resolved registry config: {registry_config:?}");

            let source: Box<dyn PackageSource> = match registry_config {
                config::RegistryConfig::Local(config) => Box::new(LocalSource::new(config)),
                config::RegistryConfig::Oci(config) => {
                    Box::new(self.build_oci_client(&registry, config).await?)
                }
            };
            self.sources.insert(registry.clone(), source);
        }
        Ok(self.sources.get_mut(&registry).unwrap().as_mut())
    }

    async fn build_oci_client(
        &mut self,
        registry: &str,
        config: OciConfig,
    ) -> Result<OciSource, Error> {
        tracing::debug!("Building new OCI client for {registry:?}");

        // Check registry metadata for OCI registry override
        let registry_meta = match RegistryMeta::fetch(registry).await {
            Ok(Some(meta)) => {
                tracing::debug!("Got registry metadata {meta:?}");
                meta
            }
            Ok(None) => {
                tracing::debug!("Metadata not found");
                Default::default()
            }
            Err(err) => {
                tracing::warn!("Error fetching registry metadata: {err}");
                Default::default()
            }
        };

        OciSource::new(registry.to_string(), config, registry_meta)
    }
}

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    #[error("failed to get registry credentials: {0:#}")]
    CredentialError(anyhow::Error),
    #[error("invalid config: {0:#}")]
    InvalidConfig(anyhow::Error),
    #[error("invalid content digest: {0}")]
    InvalidContentDigest(String),
    #[error("invalid label: {0}")]
    InvalidLabel(#[from] InvalidLabel),
    #[error("invalid package ref: {0}")]
    InvalidPackageRef(String),
    #[error("invalid package manifest: {0}")]
    InvalidPackageManifest(String),
    #[error("IO error: {0}")]
    IoError(#[from] std::io::Error),
    #[error("OCI error: {0}")]
    OciError(#[from] OciDistributionError),
    #[error("no registry configured for namespace {0:?}")]
    NoRegistryForNamespace(Label),
    #[error("registry metadata error: {0:#}")]
    RegistryMeta(#[source] anyhow::Error),
    #[error("invalid version: {0}")]
    VersionError(#[from] semver::Error),
}