#![deny(missing_docs)]
use crate::storage::PackageInfo;
use anyhow::{anyhow, Context, Result};
use indexmap::IndexMap;
use reqwest::header::HeaderValue;
use reqwest::{Body, IntoUrl};
use secrecy::Secret;
use semver::{Version, VersionReq};
use std::cmp::Ordering;
use std::fs;
use std::str::FromStr;
use std::{borrow::Cow, path::PathBuf, time::Duration};
use storage::{
ContentStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage,
FileSystemRegistryStorage, NamespaceMapStorage, PublishInfo, RegistryDomain, RegistryStorage,
};
use thiserror::Error;
use warg_api::v1::{
fetch::{FetchError, FetchLogsRequest, FetchLogsResponse},
package::{
MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
UploadEndpoint,
},
proof::{ConsistencyRequest, InclusionRequest},
};
use warg_crypto::hash::Sha256;
use warg_crypto::{hash::AnyHash, signing, Encode, Signable};
use warg_protocol::package::ReleaseState;
use warg_protocol::{
operator, package,
registry::{LogId, LogLeaf, PackageName, RecordId, RegistryLen, TimestampedCheckpoint},
PublishedProtoEnvelope, SerdeEnvelope,
};
use wasm_compose::graph::{CompositionGraph, EncodeOptions, ExportIndex, InstanceId};
pub mod api;
mod config;
pub mod depsolve;
use depsolve::{Bundler, LockListBuilder};
pub mod version_util;
use version_util::{kindless_name, locked_package, versioned_package, Import, ImportKind};
pub mod lock;
mod registry_url;
pub mod storage;
pub use self::config::*;
pub use self::registry_url::RegistryUrl;
pub struct Client<R, C, N>
where
R: RegistryStorage,
C: ContentStorage,
N: NamespaceMapStorage,
{
registry: R,
content: C,
namespace_map: N,
api: api::Client,
}
impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C, N> {
pub fn new(
url: impl IntoUrl,
registry: R,
content: C,
namespace_map: N,
auth_token: Option<Secret<String>>,
) -> ClientResult<Self> {
let api = api::Client::new(url, auth_token)?;
Ok(Self {
registry,
content,
namespace_map,
api,
})
}
pub fn url(&self) -> &RegistryUrl {
self.api.url()
}
pub fn registry(&self) -> &R {
&self.registry
}
pub fn content(&self) -> &C {
&self.content
}
pub fn namespace_map(&self) -> &N {
&self.namespace_map
}
pub async fn store_namespace(
&self,
namespace: String,
registry_domain: RegistryDomain,
) -> Result<()> {
self.namespace_map
.store_namespace(namespace, registry_domain)
.await?;
Ok(())
}
pub async fn reset_namespaces(&self) -> Result<()> {
self.namespace_map.reset_namespaces().await?;
Ok(())
}
pub async fn reset_registry(&self, all_registries: bool) -> ClientResult<()> {
tracing::info!("resetting registry local state");
self.registry
.reset(all_registries)
.await
.or(Err(ClientError::ResettingRegistryLocalStateFailed))
}
pub async fn clear_content_cache(&self) -> ClientResult<()> {
tracing::info!("removing content cache");
self.content
.clear()
.await
.or(Err(ClientError::ClearContentCacheFailed))
}
pub async fn refresh_namespace(&mut self, namespace: &str) -> ClientResult<()> {
self.update_checkpoint(&self.api.latest_checkpoint().await?, vec![])
.await?;
let operator = self.registry().load_operator(&None).await?;
let operator_log_maps_namespace = if let Some(op) = operator {
let namespace_state = op.state.namespace_state(namespace);
if let Ok(Some(nm)) = namespace_state {
if let warg_protocol::operator::NamespaceState::Imported { registry } = nm {
self.api
.set_warg_registry(Some(RegistryDomain::from_str(registry)?));
}
true
} else {
false
}
} else {
false
};
if !operator_log_maps_namespace {
let map = self.namespace_map().load_namespace_map().await?;
if let Some(map) = map {
let namespace = map.get(namespace);
if let Some(nm) = namespace {
self.api
.set_warg_registry(Some(RegistryDomain::from_str(nm)?));
} else {
self.api.set_warg_registry(None);
}
}
}
Ok(())
}
pub fn get_warg_registry(&self) -> &Option<RegistryDomain> {
self.api.get_warg_registry()
}
pub async fn lock_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
let mut builder = LockListBuilder::default();
builder.build_list(self, info).await?;
let top = Import {
name: format!("{}:{}", info.name.namespace(), info.name.name()),
req: VersionReq::STAR,
kind: ImportKind::Unlocked,
};
builder.lock_list.insert(top);
let mut composer = CompositionGraph::new();
let mut handled = IndexMap::<String, InstanceId>::new();
for package in builder.lock_list {
let name = package.name.clone();
let version = package.req;
let id = PackageName::new(name)?;
let info = self
.registry()
.load_package(self.api.get_warg_registry(), &id)
.await?;
if let Some(inf) = info {
let release = if version != VersionReq::STAR {
inf.state
.releases()
.filter(|r| version.matches(&r.version))
.last()
} else {
inf.state.releases().last()
};
if let Some(r) = release {
let state = &r.state;
if let ReleaseState::Released { content } = state {
let locked_package = locked_package(&package.name, r, content);
let path = self.content().content_location(content);
if let Some(p) = path {
let bytes = fs::read(&p).map_err(|_| ClientError::ContentNotFound {
digest: content.clone(),
})?;
let read_digest =
AnyHash::from_str(&format!("sha256:{}", sha256::digest(bytes)))
.unwrap();
if content != &read_digest {
return Err(ClientError::IncorrectContent {
digest: read_digest,
expected: content.clone(),
});
}
let component =
wasm_compose::graph::Component::from_file(&locked_package, p)?;
let component_id = if let Some((id, _)) =
composer.get_component_by_name(&locked_package)
{
id
} else {
composer.add_component(component)?
};
let instance_id = composer.instantiate(component_id)?;
let added = composer.get_component(component_id);
handled.insert(versioned_package(&package.name, version), instance_id);
let mut args = Vec::new();
if let Some(added) = added {
for (index, name, _) in added.imports() {
let iid = handled.get(kindless_name(name));
if let Some(arg) = iid {
args.push((arg, index));
}
}
}
for arg in args {
composer.connect(
*arg.0,
None::<ExportIndex>,
instance_id,
arg.1,
)?;
}
}
}
}
}
}
let final_name = &format!("{}:{}", info.name.namespace(), &info.name.name());
let id = handled.get(final_name);
let options = EncodeOptions {
export: id.copied(),
..Default::default()
};
let locked = composer.encode(options)?;
fs::write("./locked.wasm", locked.as_slice()).map_err(|e| ClientError::Other(e.into()))?;
Ok(locked)
}
pub async fn bundle_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
let mut bundler = Bundler::new(self);
let path = PathBuf::from("./locked.wasm");
let locked = if !path.is_file() {
self.lock_component(info).await?
} else {
fs::read("./locked.wasm").map_err(|e| ClientError::Other(e.into()))?
};
let bundled = bundler.parse(&locked).await?;
fs::write("./bundled.wasm", bundled.as_slice())
.map_err(|e| ClientError::Other(e.into()))?;
Ok(bundled.as_slice().to_vec())
}
pub async fn publish(&self, signing_key: &signing::PrivateKey) -> ClientResult<RecordId> {
let info = self
.registry
.load_publish()
.await?
.ok_or(ClientError::NotPublishing)?;
let res = self.publish_with_info(signing_key, info).await;
self.registry.store_publish(None).await?;
res
}
pub async fn publish_with_info(
&self,
signing_key: &signing::PrivateKey,
mut info: PublishInfo,
) -> ClientResult<RecordId> {
if info.entries.is_empty() {
return Err(ClientError::NothingToPublish {
name: info.name.clone(),
});
}
let initializing = info.initializing();
tracing::info!(
"publishing {new}package `{name}`",
name = info.name,
new = if initializing { "new " } else { "" }
);
tracing::debug!("entries: {:?}", info.entries);
let mut package = self
.registry
.load_package(self.api.get_warg_registry(), &info.name)
.await?
.unwrap_or_else(|| PackageInfo::new(info.name.clone()));
if !initializing && info.head.is_none() {
self.update_checkpoint(&self.api.latest_checkpoint().await?, [&mut package])
.await?;
info.head = package.state.head().as_ref().map(|h| h.digest.clone());
}
match (initializing, info.head.is_some()) {
(true, true) => {
return Err(ClientError::CannotInitializePackage { name: package.name })
}
(false, false) => {
return Err(ClientError::MustInitializePackage { name: package.name })
}
_ => (),
}
let record = info.finalize(signing_key)?;
let log_id = LogId::package_log::<Sha256>(&package.name);
let record = self
.api
.publish_package_record(
&log_id,
PublishRecordRequest {
package_name: Cow::Borrowed(&package.name),
record: Cow::Owned(record.into()),
content_sources: Default::default(),
},
)
.await
.map_err(|e| {
ClientError::translate_log_not_found(e, |id| {
if id == &log_id {
Some(package.name.clone())
} else {
None
}
})
})?;
for (digest, MissingContent { upload }) in record.missing_content() {
let Some(UploadEndpoint::Http {
method,
url,
headers,
}) = upload.first()
else {
continue;
};
self.api
.upload_content(
method,
url,
headers,
Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
|| ClientError::ContentNotFound {
digest: digest.clone(),
},
)?),
)
.await
.map_err(|e| match e {
api::ClientError::Package(PackageError::Rejection(reason)) => {
ClientError::PublishRejected {
name: package.name.clone(),
record_id: record.record_id.clone(),
reason,
}
}
_ => e.into(),
})?;
}
Ok(record.record_id)
}
pub async fn wait_for_publish(
&self,
package: &PackageName,
record_id: &RecordId,
interval: Duration,
) -> ClientResult<()> {
let log_id = LogId::package_log::<Sha256>(package);
let mut current = self.get_package_record(package, &log_id, record_id).await?;
loop {
match current.state {
PackageRecordState::Sourcing { .. } => {
return Err(ClientError::PackageMissingContent);
}
PackageRecordState::Published { .. } => {
return Ok(());
}
PackageRecordState::Rejected { reason } => {
return Err(ClientError::PublishRejected {
name: package.clone(),
record_id: record_id.clone(),
reason,
});
}
PackageRecordState::Processing => {
tokio::time::sleep(interval).await;
current = self.get_package_record(package, &log_id, record_id).await?;
}
}
}
}
pub async fn update_all(&mut self) -> ClientResult<()> {
let packages = self.registry.load_all_packages().await?;
let checkpoints = self.api.latest_checkpoints(packages.keys()).await?;
self.update_checkpoints(checkpoints, packages).await?;
Ok(())
}
pub async fn update(&self) -> ClientResult<()> {
tracing::info!("updating all packages to latest checkpoint");
let mut updating = self.registry.load_packages().await?;
self.update_checkpoint(&self.api.latest_checkpoint().await?, &mut updating)
.await?;
Ok(())
}
pub async fn upsert<'a, I>(&self, packages: I) -> Result<(), ClientError>
where
I: IntoIterator<Item = &'a PackageName>,
I::IntoIter: ExactSizeIterator,
{
tracing::info!("updating specific packages to latest checkpoint");
let packages = packages.into_iter();
let mut updating = Vec::with_capacity(packages.len());
for package in packages {
updating.push(
self.registry
.load_package(self.api.get_warg_registry(), package)
.await?
.unwrap_or_else(|| PackageInfo::new(package.clone())),
);
}
self.update_checkpoint(&self.api.latest_checkpoint().await?, &mut updating)
.await?;
Ok(())
}
pub async fn download(
&self,
name: &PackageName,
requirement: &VersionReq,
) -> Result<Option<PackageDownload>, ClientError> {
tracing::info!("downloading package `{name}` with requirement `{requirement}`");
let info = self.fetch_package(name).await?;
match info.state.find_latest_release(requirement) {
Some(release) => {
let digest = release
.content()
.context("invalid state: not yanked but missing content")?
.clone();
let path = self.download_content(&digest).await?;
Ok(Some(PackageDownload {
version: release.version.clone(),
digest,
path,
}))
}
None => Ok(None),
}
}
pub async fn download_exact(
&self,
package: &PackageName,
version: &Version,
) -> Result<PackageDownload, ClientError> {
tracing::info!("downloading version {version} of package `{package}`");
let info = self.fetch_package(package).await?;
let release =
info.state
.release(version)
.ok_or_else(|| ClientError::PackageVersionDoesNotExist {
version: version.clone(),
name: package.clone(),
})?;
let digest = release
.content()
.ok_or_else(|| ClientError::PackageVersionDoesNotExist {
version: version.clone(),
name: package.clone(),
})?;
Ok(PackageDownload {
version: version.clone(),
digest: digest.clone(),
path: self.download_content(digest).await?,
})
}
async fn update_checkpoint<'a>(
&self,
ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
packages: impl IntoIterator<Item = &mut PackageInfo>,
) -> Result<(), ClientError> {
let checkpoint = &ts_checkpoint.as_ref().checkpoint;
tracing::info!(
"updating to checkpoint log length `{}`",
checkpoint.log_length
);
let mut operator = self
.registry
.load_operator(self.api.get_warg_registry())
.await?
.unwrap_or_default();
let mut packages = packages
.into_iter()
.filter_map(|p| match &p.checkpoint {
Some(c) if c == checkpoint => None,
_ => Some((LogId::package_log::<Sha256>(&p.name), p)),
})
.inspect(|(_, p)| tracing::info!("package `{name}` will be updated", name = p.name))
.collect::<IndexMap<_, _>>();
if packages.is_empty() {
return Ok(());
}
let mut last_known = packages
.iter()
.map(|(id, p)| (id.clone(), p.head_fetch_token.clone()))
.collect::<IndexMap<_, _>>();
loop {
let response: FetchLogsResponse = self
.api
.fetch_logs(FetchLogsRequest {
log_length: checkpoint.log_length,
operator: operator
.head_fetch_token
.as_ref()
.map(|t| Cow::Borrowed(t.as_str())),
limit: None,
packages: Cow::Borrowed(&last_known),
})
.await
.map_err(|e| {
ClientError::translate_log_not_found(e, |id| {
packages.get(id).map(|p| p.name.clone())
})
})?;
for record in response.operator {
let proto_envelope: PublishedProtoEnvelope<operator::OperatorRecord> =
record.envelope.try_into()?;
if operator.head_registry_index.is_none()
|| proto_envelope.registry_index > operator.head_registry_index.unwrap()
{
operator.state = operator
.state
.validate(&proto_envelope.envelope)
.map_err(|inner| ClientError::OperatorValidationFailed { inner })?;
operator.head_registry_index = Some(proto_envelope.registry_index);
operator.head_fetch_token = Some(record.fetch_token);
}
}
for (log_id, records) in response.packages {
let package = packages.get_mut(&log_id).ok_or_else(|| {
anyhow!("received records for unknown package log `{log_id}`")
})?;
for record in records {
let proto_envelope: PublishedProtoEnvelope<package::PackageRecord> =
record.envelope.try_into()?;
if package.head_registry_index.is_none()
|| proto_envelope.registry_index > package.head_registry_index.unwrap()
{
let state = std::mem::take(&mut package.state);
package.state =
state.validate(&proto_envelope.envelope).map_err(|inner| {
ClientError::PackageValidationFailed {
name: package.name.clone(),
inner,
}
})?;
package.head_registry_index = Some(proto_envelope.registry_index);
package.head_fetch_token = Some(record.fetch_token);
}
}
if package.state.head().is_none() {
return Err(ClientError::PackageLogEmpty {
name: package.name.clone(),
});
}
}
if !response.more {
break;
}
for (id, fetch_token) in last_known.iter_mut() {
*fetch_token = packages[id].head_fetch_token.clone();
}
}
TimestampedCheckpoint::verify(
operator.state.public_key(ts_checkpoint.key_id()).ok_or(
ClientError::InvalidCheckpointKeyId {
key_id: ts_checkpoint.key_id().clone(),
},
)?,
&ts_checkpoint.as_ref().encode(),
ts_checkpoint.signature(),
)
.or(Err(ClientError::InvalidCheckpointSignature))?;
let mut leaf_indices = Vec::with_capacity(packages.len() + 1 );
let mut leafs = Vec::with_capacity(leaf_indices.len());
if let Some(index) = operator.head_registry_index {
leaf_indices.push(index);
leafs.push(LogLeaf {
log_id: LogId::operator_log::<Sha256>(),
record_id: operator.state.head().as_ref().unwrap().digest.clone(),
});
} else {
return Err(ClientError::NoOperatorRecords);
}
for (log_id, package) in &packages {
if let Some(index) = package.head_registry_index {
leaf_indices.push(index);
leafs.push(LogLeaf {
log_id: log_id.clone(),
record_id: package.state.head().as_ref().unwrap().digest.clone(),
});
} else {
return Err(ClientError::PackageLogEmpty {
name: package.name.clone(),
});
}
}
if !leafs.is_empty() {
self.api
.prove_inclusion(
InclusionRequest {
log_length: checkpoint.log_length,
leafs: leaf_indices,
},
checkpoint,
&leafs,
)
.await?;
}
if let Some(from) = self
.registry
.load_checkpoint(self.api.get_warg_registry())
.await?
{
let from_log_length = from.as_ref().checkpoint.log_length;
let to_log_length = ts_checkpoint.as_ref().checkpoint.log_length;
match from_log_length.cmp(&to_log_length) {
Ordering::Greater => {
return Err(ClientError::CheckpointLogLengthRewind {
from: from_log_length,
to: to_log_length,
});
}
Ordering::Less => {
self.api
.prove_log_consistency(
ConsistencyRequest {
from: from_log_length,
to: to_log_length,
},
Cow::Borrowed(&from.as_ref().checkpoint.log_root),
Cow::Borrowed(&ts_checkpoint.as_ref().checkpoint.log_root),
)
.await?
}
Ordering::Equal => {
if from.as_ref().checkpoint.log_root
!= ts_checkpoint.as_ref().checkpoint.log_root
|| from.as_ref().checkpoint.map_root
!= ts_checkpoint.as_ref().checkpoint.map_root
{
return Err(ClientError::CheckpointChangedLogRootOrMapRoot {
log_length: from_log_length,
});
}
}
}
}
self.registry
.store_operator(self.api.get_warg_registry(), operator)
.await?;
for package in packages.values_mut() {
package.checkpoint = Some(checkpoint.clone());
self.registry
.store_package(self.api.get_warg_registry(), package)
.await?;
}
self.registry
.store_checkpoint(self.api.get_warg_registry(), ts_checkpoint)
.await?;
Ok(())
}
async fn update_checkpoints<'a>(
&mut self,
ts_checkpoints: IndexMap<std::string::String, SerdeEnvelope<TimestampedCheckpoint>>,
mut packages: IndexMap<String, Vec<PackageInfo>>,
) -> Result<(), ClientError> {
for (name, ts_checkpoint) in ts_checkpoints {
if self.url().safe_label() != name {
self.api
.set_warg_registry(Some(RegistryDomain::from_str(&name)?));
} else {
self.api.set_warg_registry(None)
}
let mut packages = packages.get_mut(&name.clone());
if let Some(pkgs) = &mut packages {
self.update_checkpoint(&ts_checkpoint, pkgs.as_mut_slice())
.await?;
}
}
Ok(())
}
async fn fetch_package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
match self
.registry
.load_package(self.api.get_warg_registry(), name)
.await?
{
Some(info) => {
tracing::info!("log for package `{name}` already exists in storage");
Ok(info)
}
None => {
let mut info = PackageInfo::new(name.clone());
self.update_checkpoint(&self.api.latest_checkpoint().await?, [&mut info])
.await?;
Ok(info)
}
}
}
async fn get_package_record(
&self,
package: &PackageName,
log_id: &LogId,
record_id: &RecordId,
) -> ClientResult<PackageRecord> {
let record = self
.api
.get_package_record(log_id, record_id)
.await
.map_err(|e| {
ClientError::translate_log_not_found(e, |id| {
if id == log_id {
Some(package.clone())
} else {
None
}
})
})?;
Ok(record)
}
pub async fn download_content(&self, digest: &AnyHash) -> Result<PathBuf, ClientError> {
match self.content.content_location(digest) {
Some(path) => {
tracing::info!("content for digest `{digest}` already exists in storage");
Ok(path)
}
None => {
self.content
.store_content(
Box::pin(self.api.download_content(digest).await?),
Some(digest),
)
.await?;
self.content
.content_location(digest)
.ok_or_else(|| ClientError::ContentNotFound {
digest: digest.clone(),
})
}
}
}
}
pub type FileSystemClient =
Client<FileSystemRegistryStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage>;
pub enum StorageLockResult<T> {
Acquired(T),
NotAcquired(PathBuf),
}
impl FileSystemClient {
pub fn try_new_with_config(
url: Option<&str>,
config: &Config,
auth_token: Option<Secret<String>>,
) -> Result<StorageLockResult<Self>, ClientError> {
let StoragePaths {
registry_url: url,
registries_dir,
content_dir,
namespace_map_path,
} = config.storage_paths_for_url(url)?;
let (packages, content, namespace_map) = match (
FileSystemRegistryStorage::try_lock(registries_dir.clone())?,
FileSystemContentStorage::try_lock(content_dir.clone())?,
FileSystemNamespaceMapStorage::new(namespace_map_path.clone()),
) {
(Some(packages), Some(content), namespace_map) => (packages, content, namespace_map),
(None, _, _) => return Ok(StorageLockResult::NotAcquired(registries_dir)),
(_, None, _) => return Ok(StorageLockResult::NotAcquired(content_dir)),
};
Ok(StorageLockResult::Acquired(Self::new(
url.into_url(),
packages,
content,
namespace_map,
auth_token,
)?))
}
pub fn new_with_config(
url: Option<&str>,
config: &Config,
auth_token: Option<Secret<String>>,
) -> Result<Self, ClientError> {
let StoragePaths {
registry_url,
registries_dir,
content_dir,
namespace_map_path,
} = config.storage_paths_for_url(url)?;
Self::new(
registry_url.into_url(),
FileSystemRegistryStorage::lock(registries_dir)?,
FileSystemContentStorage::lock(content_dir)?,
FileSystemNamespaceMapStorage::new(namespace_map_path),
auth_token,
)
}
}
#[derive(Debug, Clone)]
pub struct PackageDownload {
pub version: Version,
pub digest: AnyHash,
pub path: PathBuf,
}
#[derive(Debug, Error)]
pub enum ClientError {
#[error("no home registry registry server URL is configured")]
NoHomeRegistryUrl,
#[error("reset registry state failed")]
ResettingRegistryLocalStateFailed,
#[error("clear content cache failed")]
ClearContentCacheFailed,
#[error("invalid checkpoint signature")]
InvalidCheckpointSignature,
#[error("invalid checkpoint key ID `{key_id}`")]
InvalidCheckpointKeyId {
key_id: signing::KeyID,
},
#[error("the server did not provide any operator records")]
NoOperatorRecords,
#[error("operator failed validation: {inner}")]
OperatorValidationFailed {
inner: operator::ValidationError,
},
#[error("package `{name}` already exists and cannot be initialized")]
CannotInitializePackage {
name: PackageName,
},
#[error("package `{name}` must be initialized before publishing")]
MustInitializePackage {
name: PackageName,
},
#[error("there is no publish operation in progress")]
NotPublishing,
#[error("package `{name}` has no records to publish")]
NothingToPublish {
name: PackageName,
},
#[error("package `{name}` does not exist")]
PackageDoesNotExist {
name: PackageName,
},
#[error("package `{name}` does not exist")]
PackageDoesNotExistWithHint {
name: PackageName,
hint: HeaderValue,
},
#[error("version `{version}` of package `{name}` does not exist")]
PackageVersionDoesNotExist {
version: Version,
name: PackageName,
},
#[error("package `{name}` failed validation: {inner}")]
PackageValidationFailed {
name: PackageName,
inner: package::ValidationError,
},
#[error("content with digest `{digest}` was not found in client storage")]
ContentNotFound {
digest: AnyHash,
},
#[error("content with digest `{digest}` was not found expected `{expected}`")]
IncorrectContent {
digest: AnyHash,
expected: AnyHash,
},
#[error("package log is empty and cannot be validated")]
PackageLogEmpty {
name: PackageName,
},
#[error("the publishing of package `{name}` was rejected due to: {reason}")]
PublishRejected {
name: PackageName,
record_id: RecordId,
reason: String,
},
#[error("the package is still missing content after all content was uploaded")]
PackageMissingContent,
#[error("registry rewinded checkpoints; latest checkpoint log length `{to}` is less than previously received checkpoint log length `{from}`")]
CheckpointLogLengthRewind {
from: RegistryLen,
to: RegistryLen,
},
#[error("registry provided a new checkpoint with the same log length `{log_length}` as previously fetched but different log root or map root")]
CheckpointChangedLogRootOrMapRoot {
log_length: RegistryLen,
},
#[error(transparent)]
Api(#[from] api::ClientError),
#[error("{0:?}")]
Other(#[from] anyhow::Error),
}
impl ClientError {
fn translate_log_not_found(
e: api::ClientError,
lookup: impl Fn(&LogId) -> Option<PackageName>,
) -> Self {
match &e {
api::ClientError::Fetch(FetchError::LogNotFound(id))
| api::ClientError::Package(PackageError::LogNotFound(id)) => {
if let Some(name) = lookup(id) {
return Self::PackageDoesNotExist { name };
}
}
api::ClientError::LogNotFoundWithHint(log_id, hint) => {
if let Some(name) = lookup(log_id) {
return Self::PackageDoesNotExistWithHint {
name,
hint: hint.clone(),
};
}
}
_ => {}
}
Self::Api(e)
}
}
pub type ClientResult<T> = Result<T, ClientError>;