warg_loader/source/
warg.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
4use secrecy::SecretString;
5use semver::Version;
6use warg_client::{
7    storage::{PackageInfo, RegistryStorage},
8    ClientError, FileSystemClient,
9};
10use warg_protocol::registry::PackageName;
11
12use crate::{meta::RegistryMeta, source::PackageSource, Error, PackageRef, Release};
13
14#[derive(Clone, Debug, Default)]
15pub struct WargConfig {
16    pub client_config: warg_client::Config,
17    pub auth_token: Option<SecretString>,
18}
19
20pub struct WargSource {
21    client: FileSystemClient,
22    api_client: warg_client::api::Client,
23}
24
25impl WargSource {
26    pub fn new(
27        registry: String,
28        config: WargConfig,
29        registry_meta: RegistryMeta,
30    ) -> Result<Self, Error> {
31        let url = registry_meta.warg_url.unwrap_or(registry);
32        let client = FileSystemClient::new_with_config(
33            Some(url.as_str()),
34            &config.client_config,
35            config.auth_token.clone(),
36        )?;
37        let api_client = warg_client::api::Client::new(client.url().to_string(), config.auth_token)
38            .map_err(ClientError::Other)?;
39        Ok(Self { client, api_client })
40    }
41
42    async fn fetch_package_info(&mut self, package: &PackageRef) -> Result<PackageInfo, Error> {
43        let package_name = package.try_into()?;
44        self.client.upsert([&package_name]).await?;
45        self.client
46            .registry()
47            .load_package(self.client.get_warg_registry(), &package_name)
48            .await
49            .map_err(ClientError::Other)?
50            .ok_or_else(|| {
51                // TODO: standardize this error
52                ClientError::Other(anyhow::anyhow!("package not found")).into()
53            })
54    }
55}
56
57#[async_trait]
58impl PackageSource for WargSource {
59    async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<Version>, Error> {
60        let info = self.fetch_package_info(package).await?;
61        Ok(info
62            .state
63            .releases()
64            .map(|release| release.version.clone())
65            .collect())
66    }
67
68    async fn get_release(
69        &mut self,
70        package: &PackageRef,
71        version: &Version,
72    ) -> Result<Release, Error> {
73        let info = self.fetch_package_info(package).await?;
74        let release = info
75            .state
76            .release(version)
77            // TODO: standardize this error
78            .ok_or_else(|| ClientError::Other(anyhow::anyhow!("version not found")))?;
79        let content_digest = release
80            .content()
81            .ok_or_else(|| ClientError::Other(anyhow::anyhow!("release has been yanked")))?
82            .to_string();
83        Ok(Release {
84            version: version.clone(),
85            content_digest: content_digest.parse()?,
86        })
87    }
88
89    async fn stream_content_unvalidated(
90        &mut self,
91        _package: &PackageRef,
92        release: &Release,
93    ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
94        let digest = release
95            .content_digest
96            .to_string()
97            .parse()
98            .map_err(|err| ClientError::Other(anyhow::anyhow!("{err}")))?;
99        let stream = self
100            .api_client
101            .download_content(&digest)
102            .await
103            .map_err(ClientError::Api)?;
104        Ok(stream
105            .map_err(|err| Error::WargError(ClientError::Other(err)))
106            .boxed())
107    }
108}
109
110impl TryFrom<&PackageRef> for PackageName {
111    type Error = Error;
112
113    fn try_from(value: &PackageRef) -> Result<Self, Self::Error> {
114        Self::new(value.to_string()).map_err(|err| Error::WargError(ClientError::Other(err)))
115    }
116}