warg_loader/
lib.rs

1mod config;
2mod label;
3mod meta;
4mod package;
5mod release;
6mod source;
7
8use std::collections::HashMap;
9
10use bytes::Bytes;
11use futures_util::stream::BoxStream;
12use oci_distribution::errors::OciDistributionError;
13pub use semver::Version;
14use source::{
15    local::LocalSource,
16    oci::{OciConfig, OciSource},
17    warg::{WargConfig, WargSource},
18    PackageSource,
19};
20
21/// Re-exported to ease configuration.
22pub use oci_distribution::client as oci_client;
23
24pub use crate::{
25    config::ClientConfig,
26    package::PackageRef,
27    release::{ContentDigest, Release},
28};
29use crate::{
30    config::RegistryConfig,
31    label::{InvalidLabel, Label},
32    meta::RegistryMeta,
33};
34
35/// A read-only registry client.
36pub struct Client {
37    config: ClientConfig,
38    sources: HashMap<String, Box<dyn PackageSource>>,
39}
40
41impl Client {
42    /// Returns a new client with the given [`ClientConfig`].
43    pub fn new(config: ClientConfig) -> Self {
44        Self {
45            config,
46            sources: Default::default(),
47        }
48    }
49
50    /// Returns a new client configured from the default config file path.
51    /// Returns Ok(None) if the default config file does not exist.
52    pub fn from_default_config_file() -> Result<Option<Self>, Error> {
53        Ok(ClientConfig::from_default_file()?.map(Self::new))
54    }
55
56    /// Returns a list of all package [`Version`]s available for the given package.
57    pub async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<Version>, Error> {
58        let source = self.resolve_source(package).await?;
59        source.list_all_versions(package).await
60    }
61
62    /// Returns a [`Release`] for the given package version.
63    pub async fn get_release(
64        &mut self,
65        package: &PackageRef,
66        version: &Version,
67    ) -> Result<Release, Error> {
68        let source = self.resolve_source(package).await?;
69        source.get_release(package, version).await
70    }
71
72    /// Returns a [`BoxStream`] of content chunks. Contents are validated
73    /// against the given [`Release::content_digest`].
74    pub async fn stream_content(
75        &mut self,
76        package: &PackageRef,
77        release: &Release,
78    ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
79        let source = self.resolve_source(package).await?;
80        source.stream_content(package, release).await
81    }
82
83    async fn resolve_source(
84        &mut self,
85        package: &PackageRef,
86    ) -> Result<&mut dyn PackageSource, Error> {
87        let registry = self.config.resolve_package_registry(package)?.to_owned();
88        if !self.sources.contains_key(&registry) {
89            let registry_config = self.config.registry_configs.get(&registry).cloned();
90
91            tracing::debug!("Resolved registry config: {registry_config:?}");
92
93            let registry_meta = RegistryMeta::fetch_or_default(&registry).await;
94
95            let registry_config = registry_config.unwrap_or_else(|| {
96                if registry_meta.warg_url.is_some() {
97                    RegistryConfig::Warg(Default::default())
98                } else {
99                    RegistryConfig::Oci(Default::default())
100                }
101            });
102
103            let source: Box<dyn PackageSource> = match registry_config {
104                config::RegistryConfig::Local(config) => Box::new(LocalSource::new(config)),
105                config::RegistryConfig::Oci(config) => {
106                    Box::new(self.build_oci_client(&registry, config).await?)
107                }
108                config::RegistryConfig::Warg(config) => {
109                    Box::new(self.build_warg_client(&registry, config).await?)
110                }
111            };
112            self.sources.insert(registry.clone(), source);
113        }
114        Ok(self.sources.get_mut(&registry).unwrap().as_mut())
115    }
116
117    async fn build_oci_client(
118        &mut self,
119        registry: &str,
120        config: OciConfig,
121    ) -> Result<OciSource, Error> {
122        tracing::debug!("Building new OCI client for {registry:?}");
123        // Check registry metadata for OCI registry override
124        let registry_meta = RegistryMeta::fetch_or_default(registry).await;
125        OciSource::new(registry.to_string(), config, registry_meta)
126    }
127
128    async fn build_warg_client(
129        &mut self,
130        registry: &str,
131        config: WargConfig,
132    ) -> Result<WargSource, Error> {
133        tracing::debug!("Building new Warg client for {registry:?}");
134        // Check registry metadata for OCI registry override
135        let registry_meta = RegistryMeta::fetch_or_default(registry).await;
136        WargSource::new(registry.to_string(), config, registry_meta)
137    }
138}
139
140#[derive(Debug, thiserror::Error)]
141#[non_exhaustive]
142pub enum Error {
143    #[error("failed to get registry credentials: {0:#}")]
144    CredentialError(anyhow::Error),
145    #[error("invalid config: {0:#}")]
146    InvalidConfig(anyhow::Error),
147    #[error("invalid content: {0}")]
148    InvalidContent(String),
149    #[error("invalid content digest: {0}")]
150    InvalidContentDigest(String),
151    #[error("invalid label: {0}")]
152    InvalidLabel(#[from] InvalidLabel),
153    #[error("invalid package ref: {0}")]
154    InvalidPackageRef(String),
155    #[error("invalid package manifest: {0}")]
156    InvalidPackageManifest(String),
157    #[error("IO error: {0}")]
158    IoError(#[from] std::io::Error),
159    #[error("OCI error: {0}")]
160    OciError(#[from] OciDistributionError),
161    #[error("no registry configured for namespace {0:?}")]
162    NoRegistryForNamespace(Label),
163    #[error("registry metadata error: {0:#}")]
164    RegistryMeta(#[source] anyhow::Error),
165    #[error("invalid version: {0}")]
166    VersionError(#[from] semver::Error),
167    #[error("Warg error: {0}")]
168    WargError(#[from] warg_client::ClientError),
169}