pub mod caching;
mod loader;
pub mod local;
pub mod metadata;
pub mod oci;
mod publisher;
mod release;
pub mod warg;
use std::path::Path;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use anyhow::anyhow;
use bytes::Bytes;
use futures_util::Stream;
use publisher::PackagePublisher;
use tokio::io::AsyncSeekExt;
use tokio::sync::RwLock;
use tokio_util::io::SyncIoBridge;
pub use wasm_pkg_common::{
config::{Config, CustomConfig, RegistryMapping},
digest::ContentDigest,
metadata::RegistryMetadata,
package::{PackageRef, Version},
registry::Registry,
Error,
};
use wit_component::DecodedWasm;
use crate::metadata::RegistryMetadataExt;
use crate::{loader::PackageLoader, local::LocalBackend, oci::OciBackend, warg::WargBackend};
pub use release::{Release, VersionInfo};
pub type ContentStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>>;
pub type PublishingSource = Pin<Box<dyn ReaderSeeker + Send + Sync + 'static>>;
pub trait ReaderSeeker: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
impl<T> ReaderSeeker for T where T: tokio::io::AsyncRead + tokio::io::AsyncSeek {}
trait LoaderPublisher: PackageLoader + PackagePublisher {}
impl<T> LoaderPublisher for T where T: PackageLoader + PackagePublisher {}
type RegistrySources = HashMap<Registry, Arc<InnerClient>>;
type InnerClient = Box<dyn LoaderPublisher + Sync>;
#[derive(Clone, Debug, Default)]
pub struct PublishOpts {
pub package: Option<(PackageRef, Version)>,
pub registry: Option<Registry>,
}
#[derive(Clone)]
pub struct Client {
config: Arc<Config>,
sources: Arc<RwLock<RegistrySources>>,
}
impl Client {
pub fn new(config: Config) -> Self {
Self {
config: Arc::new(config),
sources: Default::default(),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn with_global_defaults() -> Result<Self, Error> {
let config = Config::global_defaults().await?;
Ok(Self::new(config))
}
pub async fn list_all_versions(&self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
let source = self.resolve_source(package, None).await?;
source.list_all_versions(package).await
}
pub async fn get_release(
&self,
package: &PackageRef,
version: &Version,
) -> Result<Release, Error> {
let source = self.resolve_source(package, None).await?;
source.get_release(package, version).await
}
pub async fn stream_content<'a>(
&'a self,
package: &'a PackageRef,
release: &'a Release,
) -> Result<ContentStream, Error> {
let source = self.resolve_source(package, None).await?;
source.stream_content(package, release).await
}
pub async fn publish_release_file(
&self,
file: impl AsRef<Path>,
additional_options: PublishOpts,
) -> Result<(PackageRef, Version), Error> {
let data = tokio::fs::OpenOptions::new().read(true).open(file).await?;
self.publish_release_data(Box::pin(data), additional_options)
.await
}
pub async fn publish_release_data(
&self,
data: PublishingSource,
additional_options: PublishOpts,
) -> Result<(PackageRef, Version), Error> {
let (data, package, version) = if let Some((p, v)) = additional_options.package {
(data, p, v)
} else {
let data = SyncIoBridge::new(data);
let (mut data, p, v) = tokio::task::spawn_blocking(|| resolve_package(data))
.await
.map_err(|e| {
crate::Error::IoError(std::io::Error::other(format!(
"Error when performing blocking IO: {e:?}"
)))
})??;
data.rewind().await?;
(data, p, v)
};
let source = self
.resolve_source(&package, additional_options.registry)
.await?;
source
.publish(&package, &version, data)
.await
.map(|_| (package, version))
}
async fn resolve_source(
&self,
package: &PackageRef,
registry_override: Option<Registry>,
) -> Result<Arc<InnerClient>, Error> {
let is_override = registry_override.is_some();
let registry = if let Some(registry) = registry_override {
registry
} else {
self.config
.resolve_registry(package)
.ok_or_else(|| Error::NoRegistryForNamespace(package.namespace().clone()))?
.to_owned()
};
let has_key = {
let sources = self.sources.read().await;
sources.contains_key(®istry)
};
if !has_key {
let registry_config = self
.config
.registry_config(®istry)
.cloned()
.unwrap_or_default();
let should_fetch_meta = registry_config.default_backend() != Some("local");
let maybe_metadata = self
.config
.package_registry_override(package)
.and_then(|mapping| match mapping {
RegistryMapping::Custom(custom) => Some(custom.metadata.clone()),
_ => None,
})
.or_else(|| {
self.config
.namespace_registry(package.namespace())
.and_then(|meta| {
match (meta, is_override) {
(RegistryMapping::Custom(custom), true)
if custom.registry == registry =>
{
Some(custom.metadata.clone())
}
(RegistryMapping::Custom(custom), false) => {
Some(custom.metadata.clone())
}
_ => None,
}
})
});
let registry_meta = if let Some(meta) = maybe_metadata {
meta
} else if should_fetch_meta {
RegistryMetadata::fetch_or_default(®istry).await
} else {
RegistryMetadata::default()
};
let backend_type = match registry_config.default_backend() {
Some(backend_type) => Some(backend_type),
None => {
let preferred_protocol = registry_meta.preferred_protocol();
if preferred_protocol == Some("local") {
return Err(Error::InvalidRegistryMetadata(anyhow!(
"registry metadata with 'local' protocol not allowed"
)));
}
preferred_protocol
}
}
.unwrap_or("oci");
tracing::debug!(?backend_type, "Resolved backend type");
let source: InnerClient = match backend_type {
"local" => Box::new(LocalBackend::new(registry_config)?),
"oci" => Box::new(OciBackend::new(
®istry,
®istry_config,
®istry_meta,
)?),
"warg" => {
Box::new(WargBackend::new(®istry, ®istry_config, ®istry_meta).await?)
}
other => {
return Err(Error::InvalidConfig(anyhow!(
"unknown backend type {other:?}"
)));
}
};
self.sources
.write()
.await
.insert(registry.clone(), Arc::new(source));
}
Ok(self.sources.read().await.get(®istry).unwrap().clone())
}
}
fn resolve_package(
mut data: SyncIoBridge<PublishingSource>,
) -> Result<(PublishingSource, PackageRef, Version), Error> {
let (resolve, package_id) =
match wit_component::decode_reader(&mut data).map_err(crate::Error::InvalidComponent)? {
DecodedWasm::Component(resolve, world_id) => {
let package_id = resolve
.worlds
.iter()
.find_map(|(id, w)| if id == world_id { w.package } else { None })
.ok_or_else(|| {
crate::Error::InvalidComponent(anyhow::anyhow!(
"component world or package not found"
))
})?;
(resolve, package_id)
}
DecodedWasm::WitPackage(resolve, package_id) => (resolve, package_id),
};
let (package, version) = resolve
.package_names
.into_iter()
.find_map(|(pkg, id)| {
(id == package_id).then(|| {
(
PackageRef::new(
pkg.namespace.try_into().unwrap(),
pkg.name.try_into().unwrap(),
),
pkg.version,
)
})
})
.ok_or_else(|| {
crate::Error::InvalidComponent(anyhow::anyhow!("component package not found"))
})?;
let version = version.ok_or_else(|| {
crate::Error::InvalidComponent(anyhow::anyhow!("component package version not found"))
})?;
Ok((data.into_inner(), package, version))
}