1pub mod caching;
30mod loader;
31pub mod local;
32pub mod metadata;
33pub mod oci;
34mod publisher;
35mod release;
36pub mod warg;
37
38use std::path::Path;
39use std::sync::Arc;
40use std::{collections::HashMap, pin::Pin};
41
42use anyhow::anyhow;
43use bytes::Bytes;
44use futures_util::Stream;
45use publisher::PackagePublisher;
46use tokio::io::AsyncSeekExt;
47use tokio::sync::RwLock;
48use tokio_util::io::SyncIoBridge;
49pub use wasm_pkg_common::{
50 config::{Config, CustomConfig, RegistryMapping},
51 digest::ContentDigest,
52 metadata::RegistryMetadata,
53 package::{PackageRef, Version},
54 registry::Registry,
55 Error,
56};
57use wit_component::DecodedWasm;
58
59use crate::metadata::RegistryMetadataExt;
60use crate::{loader::PackageLoader, local::LocalBackend, oci::OciBackend, warg::WargBackend};
61
62pub use release::{Release, VersionInfo};
63
64pub type ContentStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>>;
66
67pub type PublishingSource = Pin<Box<dyn ReaderSeeker + Send + Sync + 'static>>;
69
70pub trait ReaderSeeker: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
72impl<T> ReaderSeeker for T where T: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
73
74trait LoaderPublisher: PackageLoader + PackagePublisher {}
75
76impl<T> LoaderPublisher for T where T: PackageLoader + PackagePublisher {}
77
78type RegistrySources = HashMap<Registry, Arc<InnerClient>>;
79type InnerClient = Box<dyn LoaderPublisher + Sync>;
80
81#[derive(Clone, Debug, Default)]
83pub struct PublishOpts {
84 pub package: Option<(PackageRef, Version)>,
86 pub registry: Option<Registry>,
88}
89
90#[derive(Clone)]
92pub struct Client {
93 config: Arc<Config>,
94 sources: Arc<RwLock<RegistrySources>>,
95}
96
97impl Client {
98 pub fn new(config: Config) -> Self {
100 Self {
101 config: Arc::new(config),
102 sources: Default::default(),
103 }
104 }
105
106 pub fn config(&self) -> &Config {
108 &self.config
109 }
110
111 pub async fn with_global_defaults() -> Result<Self, Error> {
113 let config = Config::global_defaults().await?;
114 Ok(Self::new(config))
115 }
116
117 pub async fn list_all_versions(&self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
119 let source = self.resolve_source(package, None).await?;
120 source.list_all_versions(package).await
121 }
122
123 pub async fn get_release(
125 &self,
126 package: &PackageRef,
127 version: &Version,
128 ) -> Result<Release, Error> {
129 let source = self.resolve_source(package, None).await?;
130 source.get_release(package, version).await
131 }
132
133 pub async fn stream_content<'a>(
136 &'a self,
137 package: &'a PackageRef,
138 release: &'a Release,
139 ) -> Result<ContentStream, Error> {
140 let source = self.resolve_source(package, None).await?;
141 source.stream_content(package, release).await
142 }
143
144 pub async fn publish_release_file(
148 &self,
149 file: impl AsRef<Path>,
150 additional_options: PublishOpts,
151 ) -> Result<(PackageRef, Version), Error> {
152 let data = tokio::fs::OpenOptions::new().read(true).open(file).await?;
153
154 self.publish_release_data(Box::pin(data), additional_options)
155 .await
156 }
157
158 pub async fn publish_release_data(
162 &self,
163 data: PublishingSource,
164 additional_options: PublishOpts,
165 ) -> Result<(PackageRef, Version), Error> {
166 let (data, package, version) = if let Some((p, v)) = additional_options.package {
167 (data, p, v)
168 } else {
169 let data = SyncIoBridge::new(data);
170 let (mut data, p, v) = tokio::task::spawn_blocking(|| resolve_package(data))
171 .await
172 .map_err(|e| {
173 crate::Error::IoError(std::io::Error::other(format!(
174 "Error when performing blocking IO: {e:?}"
175 )))
176 })??;
177 data.rewind().await?;
179 (data, p, v)
180 };
181 let source = self
182 .resolve_source(&package, additional_options.registry)
183 .await?;
184 source
185 .publish(&package, &version, data)
186 .await
187 .map(|_| (package, version))
188 }
189
190 async fn resolve_source(
191 &self,
192 package: &PackageRef,
193 registry_override: Option<Registry>,
194 ) -> Result<Arc<InnerClient>, Error> {
195 let is_override = registry_override.is_some();
196 let registry = if let Some(registry) = registry_override {
197 registry
198 } else {
199 self.config
200 .resolve_registry(package)
201 .ok_or_else(|| Error::NoRegistryForNamespace(package.namespace().clone()))?
202 .to_owned()
203 };
204 let has_key = {
205 let sources = self.sources.read().await;
206 sources.contains_key(®istry)
207 };
208 if !has_key {
209 let registry_config = self
210 .config
211 .registry_config(®istry)
212 .cloned()
213 .unwrap_or_default();
214
215 let should_fetch_meta = registry_config.default_backend() != Some("local");
217 let maybe_metadata = self
218 .config
219 .package_registry_override(package)
220 .and_then(|mapping| match mapping {
221 RegistryMapping::Custom(custom) => Some(custom.metadata.clone()),
222 _ => None,
223 })
224 .or_else(|| {
225 self.config
226 .namespace_registry(package.namespace())
227 .and_then(|meta| {
228 match (meta, is_override) {
232 (RegistryMapping::Custom(custom), true)
233 if custom.registry == registry =>
234 {
235 Some(custom.metadata.clone())
236 }
237 (RegistryMapping::Custom(custom), false) => {
238 Some(custom.metadata.clone())
239 }
240 _ => None,
241 }
242 })
243 });
244
245 let registry_meta = if let Some(meta) = maybe_metadata {
246 meta
247 } else if should_fetch_meta {
248 RegistryMetadata::fetch_or_default(®istry).await
249 } else {
250 RegistryMetadata::default()
251 };
252
253 let backend_type = match registry_config.default_backend() {
255 Some(backend_type) => Some(backend_type),
257 None => {
258 let preferred_protocol = registry_meta.preferred_protocol();
260 if preferred_protocol == Some("local") {
262 return Err(Error::InvalidRegistryMetadata(anyhow!(
263 "registry metadata with 'local' protocol not allowed"
264 )));
265 }
266 preferred_protocol
267 }
268 }
269 .unwrap_or("oci");
271 tracing::debug!(?backend_type, "Resolved backend type");
272
273 let source: InnerClient = match backend_type {
274 "local" => Box::new(LocalBackend::new(registry_config)?),
275 "oci" => Box::new(OciBackend::new(
276 ®istry,
277 ®istry_config,
278 ®istry_meta,
279 )?),
280 "warg" => {
281 Box::new(WargBackend::new(®istry, ®istry_config, ®istry_meta).await?)
282 }
283 other => {
284 return Err(Error::InvalidConfig(anyhow!(
285 "unknown backend type {other:?}"
286 )));
287 }
288 };
289 self.sources
290 .write()
291 .await
292 .insert(registry.clone(), Arc::new(source));
293 }
294 Ok(self.sources.read().await.get(®istry).unwrap().clone())
295 }
296}
297
298fn resolve_package(
302 mut data: SyncIoBridge<PublishingSource>,
303) -> Result<(PublishingSource, PackageRef, Version), Error> {
304 let (resolve, package_id) =
305 match wit_component::decode_reader(&mut data).map_err(crate::Error::InvalidComponent)? {
306 DecodedWasm::Component(resolve, world_id) => {
307 let package_id = resolve
308 .worlds
309 .iter()
310 .find_map(|(id, w)| if id == world_id { w.package } else { None })
311 .ok_or_else(|| {
312 crate::Error::InvalidComponent(anyhow::anyhow!(
313 "component world or package not found"
314 ))
315 })?;
316 (resolve, package_id)
317 }
318 DecodedWasm::WitPackage(resolve, package_id) => (resolve, package_id),
319 };
320 let (package, version) = resolve
321 .package_names
322 .into_iter()
323 .find_map(|(pkg, id)| {
324 (id == package_id).then(|| {
327 (
328 PackageRef::new(
329 pkg.namespace.try_into().unwrap(),
330 pkg.name.try_into().unwrap(),
331 ),
332 pkg.version,
333 )
334 })
335 })
336 .ok_or_else(|| {
337 crate::Error::InvalidComponent(anyhow::anyhow!("component package not found"))
338 })?;
339
340 let version = version.ok_or_else(|| {
341 crate::Error::InvalidComponent(anyhow::anyhow!("component package version not found"))
342 })?;
343 Ok((data.into_inner(), package, version))
344}