warg_loader/source/
warg.rs1use async_trait::async_trait;
2use bytes::Bytes;
3use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
4use secrecy::SecretString;
5use semver::Version;
6use warg_client::{
7 storage::{PackageInfo, RegistryStorage},
8 ClientError, FileSystemClient,
9};
10use warg_protocol::registry::PackageName;
11
12use crate::{meta::RegistryMeta, source::PackageSource, Error, PackageRef, Release};
13
14#[derive(Clone, Debug, Default)]
15pub struct WargConfig {
16 pub client_config: warg_client::Config,
17 pub auth_token: Option<SecretString>,
18}
19
20pub struct WargSource {
21 client: FileSystemClient,
22 api_client: warg_client::api::Client,
23}
24
25impl WargSource {
26 pub fn new(
27 registry: String,
28 config: WargConfig,
29 registry_meta: RegistryMeta,
30 ) -> Result<Self, Error> {
31 let url = registry_meta.warg_url.unwrap_or(registry);
32 let client = FileSystemClient::new_with_config(
33 Some(url.as_str()),
34 &config.client_config,
35 config.auth_token.clone(),
36 )?;
37 let api_client = warg_client::api::Client::new(client.url().to_string(), config.auth_token)
38 .map_err(ClientError::Other)?;
39 Ok(Self { client, api_client })
40 }
41
42 async fn fetch_package_info(&mut self, package: &PackageRef) -> Result<PackageInfo, Error> {
43 let package_name = package.try_into()?;
44 self.client.upsert([&package_name]).await?;
45 self.client
46 .registry()
47 .load_package(self.client.get_warg_registry(), &package_name)
48 .await
49 .map_err(ClientError::Other)?
50 .ok_or_else(|| {
51 ClientError::Other(anyhow::anyhow!("package not found")).into()
53 })
54 }
55}
56
57#[async_trait]
58impl PackageSource for WargSource {
59 async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<Version>, Error> {
60 let info = self.fetch_package_info(package).await?;
61 Ok(info
62 .state
63 .releases()
64 .map(|release| release.version.clone())
65 .collect())
66 }
67
68 async fn get_release(
69 &mut self,
70 package: &PackageRef,
71 version: &Version,
72 ) -> Result<Release, Error> {
73 let info = self.fetch_package_info(package).await?;
74 let release = info
75 .state
76 .release(version)
77 .ok_or_else(|| ClientError::Other(anyhow::anyhow!("version not found")))?;
79 let content_digest = release
80 .content()
81 .ok_or_else(|| ClientError::Other(anyhow::anyhow!("release has been yanked")))?
82 .to_string();
83 Ok(Release {
84 version: version.clone(),
85 content_digest: content_digest.parse()?,
86 })
87 }
88
89 async fn stream_content_unvalidated(
90 &mut self,
91 _package: &PackageRef,
92 release: &Release,
93 ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
94 let digest = release
95 .content_digest
96 .to_string()
97 .parse()
98 .map_err(|err| ClientError::Other(anyhow::anyhow!("{err}")))?;
99 let stream = self
100 .api_client
101 .download_content(&digest)
102 .await
103 .map_err(ClientError::Api)?;
104 Ok(stream
105 .map_err(|err| Error::WargError(ClientError::Other(err)))
106 .boxed())
107 }
108}
109
110impl TryFrom<&PackageRef> for PackageName {
111 type Error = Error;
112
113 fn try_from(value: &PackageRef) -> Result<Self, Self::Error> {
114 Self::new(value.to_string()).map_err(|err| Error::WargError(ClientError::Other(err)))
115 }
116}