1pub mod caching;
30mod loader;
31pub mod local;
32pub mod oci;
33mod publisher;
34mod release;
35pub mod warg;
36
37use std::path::Path;
38use std::sync::Arc;
39use std::{collections::HashMap, pin::Pin};
40
41use anyhow::anyhow;
42use bytes::Bytes;
43use futures_util::Stream;
44use publisher::PackagePublisher;
45use tokio::io::AsyncSeekExt;
46use tokio::sync::RwLock;
47use tokio_util::io::SyncIoBridge;
48pub use wasm_pkg_common::{
49 config::{Config, CustomConfig, RegistryMapping},
50 digest::ContentDigest,
51 metadata::RegistryMetadata,
52 package::{PackageRef, Version},
53 registry::Registry,
54 Error,
55};
56use wit_component::DecodedWasm;
57
58use crate::{loader::PackageLoader, local::LocalBackend, oci::OciBackend, warg::WargBackend};
59
60pub use release::{Release, VersionInfo};
61
62pub type ContentStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>>;
64
65pub type PublishingSource = Pin<Box<dyn ReaderSeeker + Send + Sync + 'static>>;
67
68pub trait ReaderSeeker: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
70impl<T> ReaderSeeker for T where T: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
71
72trait LoaderPublisher: PackageLoader + PackagePublisher {}
73
74impl<T> LoaderPublisher for T where T: PackageLoader + PackagePublisher {}
75
76type RegistrySources = HashMap<Registry, Arc<InnerClient>>;
77type InnerClient = Box<dyn LoaderPublisher + Sync>;
78
79#[derive(Clone, Debug, Default)]
81pub struct PublishOpts {
82 pub package: Option<(PackageRef, Version)>,
84 pub registry: Option<Registry>,
86}
87
88#[derive(Clone)]
90pub struct Client {
91 config: Arc<Config>,
92 sources: Arc<RwLock<RegistrySources>>,
93}
94
95impl Client {
96 pub fn new(config: Config) -> Self {
98 Self {
99 config: Arc::new(config),
100 sources: Default::default(),
101 }
102 }
103
104 pub fn config(&self) -> &Config {
106 &self.config
107 }
108
109 pub async fn with_global_defaults() -> Result<Self, Error> {
111 let config = Config::global_defaults().await?;
112 Ok(Self::new(config))
113 }
114
115 pub async fn list_all_versions(&self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
117 let source = self.resolve_source(package, None).await?;
118 source.list_all_versions(package).await
119 }
120
121 pub async fn get_release(
123 &self,
124 package: &PackageRef,
125 version: &Version,
126 ) -> Result<Release, Error> {
127 let source = self.resolve_source(package, None).await?;
128 source.get_release(package, version).await
129 }
130
131 pub async fn stream_content<'a>(
134 &'a self,
135 package: &'a PackageRef,
136 release: &'a Release,
137 ) -> Result<ContentStream, Error> {
138 let source = self.resolve_source(package, None).await?;
139 source.stream_content(package, release).await
140 }
141
142 pub async fn publish_release_file(
146 &self,
147 file: impl AsRef<Path>,
148 additional_options: PublishOpts,
149 ) -> Result<(PackageRef, Version), Error> {
150 let data = tokio::fs::OpenOptions::new().read(true).open(file).await?;
151
152 self.publish_release_data(Box::pin(data), additional_options)
153 .await
154 }
155
156 pub async fn publish_release_data(
160 &self,
161 data: PublishingSource,
162 additional_options: PublishOpts,
163 ) -> Result<(PackageRef, Version), Error> {
164 let (data, package, version) = if let Some((p, v)) = additional_options.package {
165 (data, p, v)
166 } else {
167 let data = SyncIoBridge::new(data);
168 let (mut data, p, v) = tokio::task::spawn_blocking(|| resolve_package(data))
169 .await
170 .map_err(|e| {
171 crate::Error::IoError(std::io::Error::new(
172 std::io::ErrorKind::Other,
173 format!("Error when performing blocking IO: {e:?}"),
174 ))
175 })??;
176 data.rewind().await?;
178 (data, p, v)
179 };
180 let source = self
181 .resolve_source(&package, additional_options.registry)
182 .await?;
183 source
184 .publish(&package, &version, data)
185 .await
186 .map(|_| (package, version))
187 }
188
189 async fn resolve_source(
190 &self,
191 package: &PackageRef,
192 registry_override: Option<Registry>,
193 ) -> Result<Arc<InnerClient>, Error> {
194 let is_override = registry_override.is_some();
195 let registry = if let Some(registry) = registry_override {
196 registry
197 } else {
198 self.config
199 .resolve_registry(package)
200 .ok_or_else(|| Error::NoRegistryForNamespace(package.namespace().clone()))?
201 .to_owned()
202 };
203 let has_key = {
204 let sources = self.sources.read().await;
205 sources.contains_key(®istry)
206 };
207 if !has_key {
208 let registry_config = self
209 .config
210 .registry_config(®istry)
211 .cloned()
212 .unwrap_or_default();
213
214 let should_fetch_meta = registry_config.default_backend() != Some("local");
216 let maybe_metadata = self
217 .config
218 .package_registry_override(package)
219 .and_then(|mapping| match mapping {
220 RegistryMapping::Custom(custom) => Some(custom.metadata.clone()),
221 _ => None,
222 })
223 .or_else(|| {
224 self.config
225 .namespace_registry(package.namespace())
226 .and_then(|meta| {
227 match (meta, is_override) {
231 (RegistryMapping::Custom(custom), true)
232 if custom.registry == registry =>
233 {
234 Some(custom.metadata.clone())
235 }
236 (RegistryMapping::Custom(custom), false) => {
237 Some(custom.metadata.clone())
238 }
239 _ => None,
240 }
241 })
242 });
243
244 let registry_meta = if let Some(meta) = maybe_metadata {
245 meta
246 } else if should_fetch_meta {
247 RegistryMetadata::fetch_or_default(®istry).await
248 } else {
249 RegistryMetadata::default()
250 };
251
252 let backend_type = match registry_config.default_backend() {
254 Some(backend_type) => Some(backend_type),
256 None => {
257 let preferred_protocol = registry_meta.preferred_protocol();
259 if preferred_protocol == Some("local") {
261 return Err(Error::InvalidRegistryMetadata(anyhow!(
262 "registry metadata with 'local' protocol not allowed"
263 )));
264 }
265 preferred_protocol
266 }
267 }
268 .unwrap_or("oci");
270 tracing::debug!(?backend_type, "Resolved backend type");
271
272 let source: InnerClient = match backend_type {
273 "local" => Box::new(LocalBackend::new(registry_config)?),
274 "oci" => Box::new(OciBackend::new(
275 ®istry,
276 ®istry_config,
277 ®istry_meta,
278 )?),
279 "warg" => {
280 Box::new(WargBackend::new(®istry, ®istry_config, ®istry_meta).await?)
281 }
282 other => {
283 return Err(Error::InvalidConfig(anyhow!(
284 "unknown backend type {other:?}"
285 )));
286 }
287 };
288 self.sources
289 .write()
290 .await
291 .insert(registry.clone(), Arc::new(source));
292 }
293 Ok(self.sources.read().await.get(®istry).unwrap().clone())
294 }
295}
296
297fn resolve_package(
301 mut data: SyncIoBridge<PublishingSource>,
302) -> Result<(PublishingSource, PackageRef, Version), Error> {
303 let (resolve, package_id) =
304 match wit_component::decode_reader(&mut data).map_err(crate::Error::InvalidComponent)? {
305 DecodedWasm::Component(resolve, world_id) => {
306 let package_id = resolve
307 .worlds
308 .iter()
309 .find_map(|(id, w)| if id == world_id { w.package } else { None })
310 .ok_or_else(|| {
311 crate::Error::InvalidComponent(anyhow::anyhow!(
312 "component world or package not found"
313 ))
314 })?;
315 (resolve, package_id)
316 }
317 DecodedWasm::WitPackage(resolve, package_id) => (resolve, package_id),
318 };
319 let (package, version) = resolve
320 .package_names
321 .into_iter()
322 .find_map(|(pkg, id)| {
323 (id == package_id).then(|| {
326 (
327 PackageRef::new(
328 pkg.namespace.try_into().unwrap(),
329 pkg.name.try_into().unwrap(),
330 ),
331 pkg.version,
332 )
333 })
334 })
335 .ok_or_else(|| {
336 crate::Error::InvalidComponent(anyhow::anyhow!("component package not found"))
337 })?;
338
339 let version = version.ok_or_else(|| {
340 crate::Error::InvalidComponent(anyhow::anyhow!("component package version not found"))
341 })?;
342 Ok((data.into_inner(), package, version))
343}