wasm_pkg_client/
lib.rs

1//! Wasm Package Client
2//!
3//! [`Client`] implements a unified interface for loading package content from
4//! multiple kinds of package registries.
5//!
6//! # Example
7//!
8//! ```no_run
9//! # async fn example() -> anyhow::Result<()> {
10//! // Initialize client from global configuration.
11//! let mut client = wasm_pkg_client::Client::with_global_defaults().await?;
12//!
13//! // Get a specific package release version.
14//! let pkg = "example:pkg".parse()?;
15//! let version = "1.0.0".parse()?;
16//! let release = client.get_release(&pkg, &version).await?;
17//!
18//! // Stream release content to a file.
19//! let mut stream = client.stream_content(&pkg, &release).await?;
20//! let mut file = tokio::fs::File::create("output.wasm").await?;
21//! use futures_util::TryStreamExt;
22//! use tokio::io::AsyncWriteExt;
23//! while let Some(chunk) = stream.try_next().await? {
24//!     file.write_all(&chunk).await?;
25//! }
26//! # Ok(()) }
27//! ```
28
29pub mod caching;
30mod loader;
31pub mod local;
32pub mod metadata;
33pub mod oci;
34mod publisher;
35mod release;
36pub mod warg;
37
38use std::path::Path;
39use std::sync::Arc;
40use std::{collections::HashMap, pin::Pin};
41
42use anyhow::anyhow;
43use bytes::Bytes;
44use futures_util::Stream;
45use publisher::PackagePublisher;
46use tokio::io::AsyncSeekExt;
47use tokio::sync::RwLock;
48use tokio_util::io::SyncIoBridge;
49pub use wasm_pkg_common::{
50    config::{Config, CustomConfig, RegistryMapping},
51    digest::ContentDigest,
52    metadata::RegistryMetadata,
53    package::{PackageRef, Version},
54    registry::Registry,
55    Error,
56};
57use wit_component::DecodedWasm;
58
59use crate::metadata::RegistryMetadataExt;
60use crate::{loader::PackageLoader, local::LocalBackend, oci::OciBackend, warg::WargBackend};
61
62pub use release::{Release, VersionInfo};
63
64/// An alias for a stream of content bytes
65pub type ContentStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>>;
66
67/// An alias for a PublishingSource (generally a file)
68pub type PublishingSource = Pin<Box<dyn ReaderSeeker + Send + Sync + 'static>>;
69
70/// A supertrait combining tokio's AsyncRead and AsyncSeek.
71pub trait ReaderSeeker: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
72impl<T> ReaderSeeker for T where T: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
73
74trait LoaderPublisher: PackageLoader + PackagePublisher {}
75
76impl<T> LoaderPublisher for T where T: PackageLoader + PackagePublisher {}
77
78type RegistrySources = HashMap<Registry, Arc<InnerClient>>;
79type InnerClient = Box<dyn LoaderPublisher + Sync>;
80
81/// Additional options for publishing a package.
82#[derive(Clone, Debug, Default)]
83pub struct PublishOpts {
84    /// Override the package name and version to publish with.
85    pub package: Option<(PackageRef, Version)>,
86    /// Override the registry to publish to.
87    pub registry: Option<Registry>,
88}
89
90/// A read-only registry client.
91#[derive(Clone)]
92pub struct Client {
93    config: Arc<Config>,
94    sources: Arc<RwLock<RegistrySources>>,
95}
96
97impl Client {
98    /// Returns a new client with the given [`Config`].
99    pub fn new(config: Config) -> Self {
100        Self {
101            config: Arc::new(config),
102            sources: Default::default(),
103        }
104    }
105
106    /// Returns a reference to the configuration this client was initialized with.
107    pub fn config(&self) -> &Config {
108        &self.config
109    }
110
111    /// Returns a new client configured from default global config.
112    pub async fn with_global_defaults() -> Result<Self, Error> {
113        let config = Config::global_defaults().await?;
114        Ok(Self::new(config))
115    }
116
117    /// Returns a list of all package [`Version`]s available for the given package.
118    pub async fn list_all_versions(&self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
119        let source = self.resolve_source(package, None).await?;
120        source.list_all_versions(package).await
121    }
122
123    /// Returns a [`Release`] for the given package version.
124    pub async fn get_release(
125        &self,
126        package: &PackageRef,
127        version: &Version,
128    ) -> Result<Release, Error> {
129        let source = self.resolve_source(package, None).await?;
130        source.get_release(package, version).await
131    }
132
133    /// Returns a [`ContentStream`] of content chunks. Contents are validated
134    /// against the given [`Release::content_digest`].
135    pub async fn stream_content<'a>(
136        &'a self,
137        package: &'a PackageRef,
138        release: &'a Release,
139    ) -> Result<ContentStream, Error> {
140        let source = self.resolve_source(package, None).await?;
141        source.stream_content(package, release).await
142    }
143
144    /// Publishes the given file as a package release. The package name and version will be read
145    /// from the component if not given as part of `additional_options`. Returns the package name
146    /// and version of the published release.
147    pub async fn publish_release_file(
148        &self,
149        file: impl AsRef<Path>,
150        additional_options: PublishOpts,
151    ) -> Result<(PackageRef, Version), Error> {
152        let data = tokio::fs::OpenOptions::new().read(true).open(file).await?;
153
154        self.publish_release_data(Box::pin(data), additional_options)
155            .await
156    }
157
158    /// Publishes the given reader as a package release. TThe package name and version will be read
159    /// from the component if not given as part of `additional_options`. Returns the package name
160    /// and version of the published release.
161    pub async fn publish_release_data(
162        &self,
163        data: PublishingSource,
164        additional_options: PublishOpts,
165    ) -> Result<(PackageRef, Version), Error> {
166        let (data, package, version) = if let Some((p, v)) = additional_options.package {
167            (data, p, v)
168        } else {
169            let data = SyncIoBridge::new(data);
170            let (mut data, p, v) = tokio::task::spawn_blocking(|| resolve_package(data))
171                .await
172                .map_err(|e| {
173                    crate::Error::IoError(std::io::Error::other(format!(
174                        "Error when performing blocking IO: {e:?}"
175                    )))
176                })??;
177            // We must rewind the reader because we read to the end to parse the component.
178            data.rewind().await?;
179            (data, p, v)
180        };
181        let source = self
182            .resolve_source(&package, additional_options.registry)
183            .await?;
184        source
185            .publish(&package, &version, data)
186            .await
187            .map(|_| (package, version))
188    }
189
190    async fn resolve_source(
191        &self,
192        package: &PackageRef,
193        registry_override: Option<Registry>,
194    ) -> Result<Arc<InnerClient>, Error> {
195        let is_override = registry_override.is_some();
196        let registry = if let Some(registry) = registry_override {
197            registry
198        } else {
199            self.config
200                .resolve_registry(package)
201                .ok_or_else(|| Error::NoRegistryForNamespace(package.namespace().clone()))?
202                .to_owned()
203        };
204        let has_key = {
205            let sources = self.sources.read().await;
206            sources.contains_key(&registry)
207        };
208        if !has_key {
209            let registry_config = self
210                .config
211                .registry_config(&registry)
212                .cloned()
213                .unwrap_or_default();
214
215            // Skip fetching metadata for "local" source
216            let should_fetch_meta = registry_config.default_backend() != Some("local");
217            let maybe_metadata = self
218                .config
219                .package_registry_override(package)
220                .and_then(|mapping| match mapping {
221                    RegistryMapping::Custom(custom) => Some(custom.metadata.clone()),
222                    _ => None,
223                })
224                .or_else(|| {
225                    self.config
226                        .namespace_registry(package.namespace())
227                        .and_then(|meta| {
228                            // If the overriden registry matches the registry we are trying to resolve, we
229                            // should use the metadata, otherwise we'll need to fetch the metadata from the
230                            // registry
231                            match (meta, is_override) {
232                                (RegistryMapping::Custom(custom), true)
233                                    if custom.registry == registry =>
234                                {
235                                    Some(custom.metadata.clone())
236                                }
237                                (RegistryMapping::Custom(custom), false) => {
238                                    Some(custom.metadata.clone())
239                                }
240                                _ => None,
241                            }
242                        })
243                });
244
245            let registry_meta = if let Some(meta) = maybe_metadata {
246                meta
247            } else if should_fetch_meta {
248                RegistryMetadata::fetch_or_default(&registry).await
249            } else {
250                RegistryMetadata::default()
251            };
252
253            // Resolve backend type
254            let backend_type = match registry_config.default_backend() {
255                // If the local config specifies a backend type, use it
256                Some(backend_type) => Some(backend_type),
257                None => {
258                    // If the registry metadata indicates a preferred protocol, use it
259                    let preferred_protocol = registry_meta.preferred_protocol();
260                    // ...except registry metadata cannot force a local backend
261                    if preferred_protocol == Some("local") {
262                        return Err(Error::InvalidRegistryMetadata(anyhow!(
263                            "registry metadata with 'local' protocol not allowed"
264                        )));
265                    }
266                    preferred_protocol
267                }
268            }
269            // Otherwise use the default backend
270            .unwrap_or("oci");
271            tracing::debug!(?backend_type, "Resolved backend type");
272
273            let source: InnerClient = match backend_type {
274                "local" => Box::new(LocalBackend::new(registry_config)?),
275                "oci" => Box::new(OciBackend::new(
276                    &registry,
277                    &registry_config,
278                    &registry_meta,
279                )?),
280                "warg" => {
281                    Box::new(WargBackend::new(&registry, &registry_config, &registry_meta).await?)
282                }
283                other => {
284                    return Err(Error::InvalidConfig(anyhow!(
285                        "unknown backend type {other:?}"
286                    )));
287                }
288            };
289            self.sources
290                .write()
291                .await
292                .insert(registry.clone(), Arc::new(source));
293        }
294        Ok(self.sources.read().await.get(&registry).unwrap().clone())
295    }
296}
297
298/// Resolves the package name and version from the given source. This takes a wrapped publishing
299/// source to it can do a blocking read with wit_component. It returns back the underlying
300/// PublishingSource but should be rewound to the beginning of the source
301fn resolve_package(
302    mut data: SyncIoBridge<PublishingSource>,
303) -> Result<(PublishingSource, PackageRef, Version), Error> {
304    let (resolve, package_id) =
305        match wit_component::decode_reader(&mut data).map_err(crate::Error::InvalidComponent)? {
306            DecodedWasm::Component(resolve, world_id) => {
307                let package_id = resolve
308                    .worlds
309                    .iter()
310                    .find_map(|(id, w)| if id == world_id { w.package } else { None })
311                    .ok_or_else(|| {
312                        crate::Error::InvalidComponent(anyhow::anyhow!(
313                            "component world or package not found"
314                        ))
315                    })?;
316                (resolve, package_id)
317            }
318            DecodedWasm::WitPackage(resolve, package_id) => (resolve, package_id),
319        };
320    let (package, version) = resolve
321        .package_names
322        .into_iter()
323        .find_map(|(pkg, id)| {
324            // SAFETY: We just parsed this from wit and should be able to unwrap. If it
325            // isn't a valid identifier, something else is majorly wrong
326            (id == package_id).then(|| {
327                (
328                    PackageRef::new(
329                        pkg.namespace.try_into().unwrap(),
330                        pkg.name.try_into().unwrap(),
331                    ),
332                    pkg.version,
333                )
334            })
335        })
336        .ok_or_else(|| {
337            crate::Error::InvalidComponent(anyhow::anyhow!("component package not found"))
338        })?;
339
340    let version = version.ok_or_else(|| {
341        crate::Error::InvalidComponent(anyhow::anyhow!("component package version not found"))
342    })?;
343    Ok((data.into_inner(), package, version))
344}