wasm_pkg_loader/source/
warg.rs1mod config;
2
3use anyhow::anyhow;
4use async_trait::async_trait;
5use bytes::Bytes;
6use config::WargConfig;
7use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
8use serde::Deserialize;
9use warg_client::{storage::PackageInfo, ClientError, FileSystemClient};
10use warg_protocol::registry::PackageName;
11use wasm_pkg_common::{
12 config::RegistryConfig,
13 metadata::RegistryMetadata,
14 package::{PackageRef, Version},
15 registry::Registry,
16 Error,
17};
18
19use crate::{
20 source::{PackageSource, VersionInfo},
21 Release,
22};
23
24#[derive(Debug, Default, Deserialize)]
25#[serde(rename_all = "camelCase")]
26struct WargRegistryMetadata {
27 url: Option<String>,
28}
29
30pub struct WargSource {
31 client: FileSystemClient,
32}
33
34impl WargSource {
35 pub async fn new(
36 registry: &Registry,
37 registry_config: &RegistryConfig,
38 registry_meta: &RegistryMetadata,
39 ) -> Result<Self, Error> {
40 let warg_meta = registry_meta
41 .protocol_config::<WargRegistryMetadata>("warg")?
42 .unwrap_or_default();
43 let url = warg_meta.url.unwrap_or_else(|| registry.to_string());
44 let WargConfig {
45 client_config,
46 auth_token,
47 } = registry_config.try_into()?;
48
49 let client_config = if let Some(client_config) = client_config {
50 client_config
51 } else {
52 warg_client::Config::from_default_file()
53 .map_err(Error::InvalidConfig)?
54 .unwrap_or_default()
55 };
56 let client =
57 FileSystemClient::new_with_config(Some(url.as_str()), &client_config, auth_token)
58 .await
59 .map_err(warg_registry_error)?;
60 Ok(Self { client })
61 }
62
63 async fn fetch_package_info(&mut self, package: &PackageRef) -> Result<PackageInfo, Error> {
64 let package_name = package_ref_to_name(package)?;
65 self.client
66 .package(&package_name)
67 .await
68 .map_err(warg_registry_error)
69 }
70}
71
72#[async_trait]
73impl PackageSource for WargSource {
74 async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
75 let info = self.fetch_package_info(package).await?;
76 Ok(info
77 .state
78 .releases()
79 .map(|r| VersionInfo {
80 version: r.version.clone(),
81 yanked: r.yanked(),
82 })
83 .collect())
84 }
85
86 async fn get_release(
87 &mut self,
88 package: &PackageRef,
89 version: &Version,
90 ) -> Result<Release, Error> {
91 let info = self.fetch_package_info(package).await?;
92 let release = info
93 .state
94 .release(version)
95 .ok_or_else(|| Error::VersionNotFound(version.clone()))?;
96 let content_digest = release
97 .content()
98 .ok_or_else(|| Error::RegistryError(anyhow!("version {version} yanked")))?
99 .to_string();
100 Ok(Release {
101 version: version.clone(),
102 content_digest: content_digest.parse()?,
103 })
104 }
105
106 async fn stream_content_unvalidated(
107 &mut self,
108 package: &PackageRef,
109 release: &Release,
110 ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
111 self.stream_content(package, release).await
112 }
113
114 async fn stream_content(
115 &mut self,
116 package: &PackageRef,
117 release: &Release,
118 ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
119 let package_name = package_ref_to_name(package)?;
120
121 let (_, stream) = self
123 .client
124 .download_exact_as_stream(&package_name, &release.version)
125 .await
126 .map_err(warg_registry_error)?;
127 Ok(stream.map_err(Error::RegistryError).boxed())
128 }
129}
130
131fn package_ref_to_name(package_ref: &PackageRef) -> Result<PackageName, Error> {
132 PackageName::new(package_ref.to_string())
133 .map_err(|err| Error::InvalidPackageRef(err.to_string()))
134}
135
136fn warg_registry_error(err: ClientError) -> Error {
137 match err {
138 ClientError::PackageDoesNotExist { .. }
139 | ClientError::PackageDoesNotExistWithHintHeader { .. } => Error::PackageNotFound,
140 ClientError::PackageVersionDoesNotExist { version, .. } => Error::VersionNotFound(version),
141 _ => Error::RegistryError(err.into()),
142 }
143}