wasm_pkg_loader/source/
warg.rs

1mod config;
2
3use anyhow::anyhow;
4use async_trait::async_trait;
5use bytes::Bytes;
6use config::WargConfig;
7use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
8use serde::Deserialize;
9use warg_client::{storage::PackageInfo, ClientError, FileSystemClient};
10use warg_protocol::registry::PackageName;
11use wasm_pkg_common::{
12    config::RegistryConfig,
13    metadata::RegistryMetadata,
14    package::{PackageRef, Version},
15    registry::Registry,
16    Error,
17};
18
19use crate::{
20    source::{PackageSource, VersionInfo},
21    Release,
22};
23
24#[derive(Debug, Default, Deserialize)]
25#[serde(rename_all = "camelCase")]
26struct WargRegistryMetadata {
27    url: Option<String>,
28}
29
30pub struct WargSource {
31    client: FileSystemClient,
32}
33
34impl WargSource {
35    pub async fn new(
36        registry: &Registry,
37        registry_config: &RegistryConfig,
38        registry_meta: &RegistryMetadata,
39    ) -> Result<Self, Error> {
40        let warg_meta = registry_meta
41            .protocol_config::<WargRegistryMetadata>("warg")?
42            .unwrap_or_default();
43        let url = warg_meta.url.unwrap_or_else(|| registry.to_string());
44        let WargConfig {
45            client_config,
46            auth_token,
47        } = registry_config.try_into()?;
48
49        let client_config = if let Some(client_config) = client_config {
50            client_config
51        } else {
52            warg_client::Config::from_default_file()
53                .map_err(Error::InvalidConfig)?
54                .unwrap_or_default()
55        };
56        let client =
57            FileSystemClient::new_with_config(Some(url.as_str()), &client_config, auth_token)
58                .await
59                .map_err(warg_registry_error)?;
60        Ok(Self { client })
61    }
62
63    async fn fetch_package_info(&mut self, package: &PackageRef) -> Result<PackageInfo, Error> {
64        let package_name = package_ref_to_name(package)?;
65        self.client
66            .package(&package_name)
67            .await
68            .map_err(warg_registry_error)
69    }
70}
71
72#[async_trait]
73impl PackageSource for WargSource {
74    async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
75        let info = self.fetch_package_info(package).await?;
76        Ok(info
77            .state
78            .releases()
79            .map(|r| VersionInfo {
80                version: r.version.clone(),
81                yanked: r.yanked(),
82            })
83            .collect())
84    }
85
86    async fn get_release(
87        &mut self,
88        package: &PackageRef,
89        version: &Version,
90    ) -> Result<Release, Error> {
91        let info = self.fetch_package_info(package).await?;
92        let release = info
93            .state
94            .release(version)
95            .ok_or_else(|| Error::VersionNotFound(version.clone()))?;
96        let content_digest = release
97            .content()
98            .ok_or_else(|| Error::RegistryError(anyhow!("version {version} yanked")))?
99            .to_string();
100        Ok(Release {
101            version: version.clone(),
102            content_digest: content_digest.parse()?,
103        })
104    }
105
106    async fn stream_content_unvalidated(
107        &mut self,
108        package: &PackageRef,
109        release: &Release,
110    ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
111        self.stream_content(package, release).await
112    }
113
114    async fn stream_content(
115        &mut self,
116        package: &PackageRef,
117        release: &Release,
118    ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
119        let package_name = package_ref_to_name(package)?;
120
121        // warg client validates the digest matches the content
122        let (_, stream) = self
123            .client
124            .download_exact_as_stream(&package_name, &release.version)
125            .await
126            .map_err(warg_registry_error)?;
127        Ok(stream.map_err(Error::RegistryError).boxed())
128    }
129}
130
131fn package_ref_to_name(package_ref: &PackageRef) -> Result<PackageName, Error> {
132    PackageName::new(package_ref.to_string())
133        .map_err(|err| Error::InvalidPackageRef(err.to_string()))
134}
135
136fn warg_registry_error(err: ClientError) -> Error {
137    match err {
138        ClientError::PackageDoesNotExist { .. }
139        | ClientError::PackageDoesNotExistWithHintHeader { .. } => Error::PackageNotFound,
140        ClientError::PackageVersionDoesNotExist { version, .. } => Error::VersionNotFound(version),
141        _ => Error::RegistryError(err.into()),
142    }
143}