wasm_pkg_client/
lib.rs

1//! Wasm Package Client
2//!
3//! [`Client`] implements a unified interface for loading package content from
4//! multiple kinds of package registries.
5//!
6//! # Example
7//!
8//! ```no_run
9//! # async fn example() -> anyhow::Result<()> {
10//! // Initialize client from global configuration.
11//! let mut client = wasm_pkg_client::Client::with_global_defaults().await?;
12//!
13//! // Get a specific package release version.
14//! let pkg = "example:pkg".parse()?;
15//! let version = "1.0.0".parse()?;
16//! let release = client.get_release(&pkg, &version).await?;
17//!
18//! // Stream release content to a file.
19//! let mut stream = client.stream_content(&pkg, &release).await?;
20//! let mut file = tokio::fs::File::create("output.wasm").await?;
21//! use futures_util::TryStreamExt;
22//! use tokio::io::AsyncWriteExt;
23//! while let Some(chunk) = stream.try_next().await? {
24//!     file.write_all(&chunk).await?;
25//! }
26//! # Ok(()) }
27//! ```
28
29pub mod caching;
30mod loader;
31pub mod local;
32pub mod oci;
33mod publisher;
34mod release;
35pub mod warg;
36
37use std::path::Path;
38use std::sync::Arc;
39use std::{collections::HashMap, pin::Pin};
40
41use anyhow::anyhow;
42use bytes::Bytes;
43use futures_util::Stream;
44use publisher::PackagePublisher;
45use tokio::io::AsyncSeekExt;
46use tokio::sync::RwLock;
47use tokio_util::io::SyncIoBridge;
48pub use wasm_pkg_common::{
49    config::{Config, CustomConfig, RegistryMapping},
50    digest::ContentDigest,
51    metadata::RegistryMetadata,
52    package::{PackageRef, Version},
53    registry::Registry,
54    Error,
55};
56use wit_component::DecodedWasm;
57
58use crate::{loader::PackageLoader, local::LocalBackend, oci::OciBackend, warg::WargBackend};
59
60pub use release::{Release, VersionInfo};
61
62/// An alias for a stream of content bytes
63pub type ContentStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>>;
64
65/// An alias for a PublishingSource (generally a file)
66pub type PublishingSource = Pin<Box<dyn ReaderSeeker + Send + Sync + 'static>>;
67
68/// A supertrait combining tokio's AsyncRead and AsyncSeek.
69pub trait ReaderSeeker: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
70impl<T> ReaderSeeker for T where T: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
71
72trait LoaderPublisher: PackageLoader + PackagePublisher {}
73
74impl<T> LoaderPublisher for T where T: PackageLoader + PackagePublisher {}
75
76type RegistrySources = HashMap<Registry, Arc<InnerClient>>;
77type InnerClient = Box<dyn LoaderPublisher + Sync>;
78
79/// Additional options for publishing a package.
80#[derive(Clone, Debug, Default)]
81pub struct PublishOpts {
82    /// Override the package name and version to publish with.
83    pub package: Option<(PackageRef, Version)>,
84    /// Override the registry to publish to.
85    pub registry: Option<Registry>,
86}
87
88/// A read-only registry client.
89#[derive(Clone)]
90pub struct Client {
91    config: Arc<Config>,
92    sources: Arc<RwLock<RegistrySources>>,
93}
94
95impl Client {
96    /// Returns a new client with the given [`Config`].
97    pub fn new(config: Config) -> Self {
98        Self {
99            config: Arc::new(config),
100            sources: Default::default(),
101        }
102    }
103
104    /// Returns a reference to the configuration this client was initialized with.
105    pub fn config(&self) -> &Config {
106        &self.config
107    }
108
109    /// Returns a new client configured from default global config.
110    pub async fn with_global_defaults() -> Result<Self, Error> {
111        let config = Config::global_defaults().await?;
112        Ok(Self::new(config))
113    }
114
115    /// Returns a list of all package [`Version`]s available for the given package.
116    pub async fn list_all_versions(&self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
117        let source = self.resolve_source(package, None).await?;
118        source.list_all_versions(package).await
119    }
120
121    /// Returns a [`Release`] for the given package version.
122    pub async fn get_release(
123        &self,
124        package: &PackageRef,
125        version: &Version,
126    ) -> Result<Release, Error> {
127        let source = self.resolve_source(package, None).await?;
128        source.get_release(package, version).await
129    }
130
131    /// Returns a [`ContentStream`] of content chunks. Contents are validated
132    /// against the given [`Release::content_digest`].
133    pub async fn stream_content<'a>(
134        &'a self,
135        package: &'a PackageRef,
136        release: &'a Release,
137    ) -> Result<ContentStream, Error> {
138        let source = self.resolve_source(package, None).await?;
139        source.stream_content(package, release).await
140    }
141
142    /// Publishes the given file as a package release. The package name and version will be read
143    /// from the component if not given as part of `additional_options`. Returns the package name
144    /// and version of the published release.
145    pub async fn publish_release_file(
146        &self,
147        file: impl AsRef<Path>,
148        additional_options: PublishOpts,
149    ) -> Result<(PackageRef, Version), Error> {
150        let data = tokio::fs::OpenOptions::new().read(true).open(file).await?;
151
152        self.publish_release_data(Box::pin(data), additional_options)
153            .await
154    }
155
156    /// Publishes the given reader as a package release. TThe package name and version will be read
157    /// from the component if not given as part of `additional_options`. Returns the package name
158    /// and version of the published release.
159    pub async fn publish_release_data(
160        &self,
161        data: PublishingSource,
162        additional_options: PublishOpts,
163    ) -> Result<(PackageRef, Version), Error> {
164        let (data, package, version) = if let Some((p, v)) = additional_options.package {
165            (data, p, v)
166        } else {
167            let data = SyncIoBridge::new(data);
168            let (mut data, p, v) = tokio::task::spawn_blocking(|| resolve_package(data))
169                .await
170                .map_err(|e| {
171                    crate::Error::IoError(std::io::Error::new(
172                        std::io::ErrorKind::Other,
173                        format!("Error when performing blocking IO: {e:?}"),
174                    ))
175                })??;
176            // We must rewind the reader because we read to the end to parse the component.
177            data.rewind().await?;
178            (data, p, v)
179        };
180        let source = self
181            .resolve_source(&package, additional_options.registry)
182            .await?;
183        source
184            .publish(&package, &version, data)
185            .await
186            .map(|_| (package, version))
187    }
188
189    async fn resolve_source(
190        &self,
191        package: &PackageRef,
192        registry_override: Option<Registry>,
193    ) -> Result<Arc<InnerClient>, Error> {
194        let is_override = registry_override.is_some();
195        let registry = if let Some(registry) = registry_override {
196            registry
197        } else {
198            self.config
199                .resolve_registry(package)
200                .ok_or_else(|| Error::NoRegistryForNamespace(package.namespace().clone()))?
201                .to_owned()
202        };
203        let has_key = {
204            let sources = self.sources.read().await;
205            sources.contains_key(&registry)
206        };
207        if !has_key {
208            let registry_config = self
209                .config
210                .registry_config(&registry)
211                .cloned()
212                .unwrap_or_default();
213
214            // Skip fetching metadata for "local" source
215            let should_fetch_meta = registry_config.default_backend() != Some("local");
216            let maybe_metadata = self
217                .config
218                .package_registry_override(package)
219                .and_then(|mapping| match mapping {
220                    RegistryMapping::Custom(custom) => Some(custom.metadata.clone()),
221                    _ => None,
222                })
223                .or_else(|| {
224                    self.config
225                        .namespace_registry(package.namespace())
226                        .and_then(|meta| {
227                            // If the overriden registry matches the registry we are trying to resolve, we
228                            // should use the metadata, otherwise we'll need to fetch the metadata from the
229                            // registry
230                            match (meta, is_override) {
231                                (RegistryMapping::Custom(custom), true)
232                                    if custom.registry == registry =>
233                                {
234                                    Some(custom.metadata.clone())
235                                }
236                                (RegistryMapping::Custom(custom), false) => {
237                                    Some(custom.metadata.clone())
238                                }
239                                _ => None,
240                            }
241                        })
242                });
243
244            let registry_meta = if let Some(meta) = maybe_metadata {
245                meta
246            } else if should_fetch_meta {
247                RegistryMetadata::fetch_or_default(&registry).await
248            } else {
249                RegistryMetadata::default()
250            };
251
252            // Resolve backend type
253            let backend_type = match registry_config.default_backend() {
254                // If the local config specifies a backend type, use it
255                Some(backend_type) => Some(backend_type),
256                None => {
257                    // If the registry metadata indicates a preferred protocol, use it
258                    let preferred_protocol = registry_meta.preferred_protocol();
259                    // ...except registry metadata cannot force a local backend
260                    if preferred_protocol == Some("local") {
261                        return Err(Error::InvalidRegistryMetadata(anyhow!(
262                            "registry metadata with 'local' protocol not allowed"
263                        )));
264                    }
265                    preferred_protocol
266                }
267            }
268            // Otherwise use the default backend
269            .unwrap_or("oci");
270            tracing::debug!(?backend_type, "Resolved backend type");
271
272            let source: InnerClient = match backend_type {
273                "local" => Box::new(LocalBackend::new(registry_config)?),
274                "oci" => Box::new(OciBackend::new(
275                    &registry,
276                    &registry_config,
277                    &registry_meta,
278                )?),
279                "warg" => {
280                    Box::new(WargBackend::new(&registry, &registry_config, &registry_meta).await?)
281                }
282                other => {
283                    return Err(Error::InvalidConfig(anyhow!(
284                        "unknown backend type {other:?}"
285                    )));
286                }
287            };
288            self.sources
289                .write()
290                .await
291                .insert(registry.clone(), Arc::new(source));
292        }
293        Ok(self.sources.read().await.get(&registry).unwrap().clone())
294    }
295}
296
297/// Resolves the package name and version from the given source. This takes a wrapped publishing
298/// source to it can do a blocking read with wit_component. It returns back the underlying
299/// PublishingSource but should be rewound to the beginning of the source
300fn resolve_package(
301    mut data: SyncIoBridge<PublishingSource>,
302) -> Result<(PublishingSource, PackageRef, Version), Error> {
303    let (resolve, package_id) =
304        match wit_component::decode_reader(&mut data).map_err(crate::Error::InvalidComponent)? {
305            DecodedWasm::Component(resolve, world_id) => {
306                let package_id = resolve
307                    .worlds
308                    .iter()
309                    .find_map(|(id, w)| if id == world_id { w.package } else { None })
310                    .ok_or_else(|| {
311                        crate::Error::InvalidComponent(anyhow::anyhow!(
312                            "component world or package not found"
313                        ))
314                    })?;
315                (resolve, package_id)
316            }
317            DecodedWasm::WitPackage(resolve, package_id) => (resolve, package_id),
318        };
319    let (package, version) = resolve
320        .package_names
321        .into_iter()
322        .find_map(|(pkg, id)| {
323            // SAFETY: We just parsed this from wit and should be able to unwrap. If it
324            // isn't a valid identifier, something else is majorly wrong
325            (id == package_id).then(|| {
326                (
327                    PackageRef::new(
328                        pkg.namespace.try_into().unwrap(),
329                        pkg.name.try_into().unwrap(),
330                    ),
331                    pkg.version,
332                )
333            })
334        })
335        .ok_or_else(|| {
336            crate::Error::InvalidComponent(anyhow::anyhow!("component package not found"))
337        })?;
338
339    let version = version.ok_or_else(|| {
340        crate::Error::InvalidComponent(anyhow::anyhow!("component package version not found"))
341    })?;
342    Ok((data.into_inner(), package, version))
343}