use std::sync::Arc;
use crate::keys;
use crate::object_store::azure::AzureStore;
use crate::object_store::s3::S3Store;
use crate::object_store::{BoxError, ObjectStore, ObjectStoreError};
use crate::url::{RemoteUrl, StorageEngine};
pub use crate::url::BackendKind;
#[derive(Debug, thiserror::Error)]
pub enum BackendError {
#[error("{} not found {name}", container_word(*kind))]
BucketNotFound {
kind: BackendKind,
name: String,
},
#[error("user not authorized to perform {action} on {} {name}", container_word(*kind))]
NotAuthorized {
kind: BackendKind,
action: String,
name: String,
},
#[error("connection error: {source}")]
Network {
#[source]
source: BoxError,
},
#[error("invalid credentials {source}")]
InvalidCredentials {
#[source]
source: ObjectStoreError,
},
#[error(
"{} uses unknown storage engine `{stored}`; \
this client supports {}",
container_word(*kind),
StorageEngine::supported_list_str()
)]
UnknownStoredEngine {
kind: BackendKind,
stored: String,
},
#[error(
"URL specifies engine `{url_engine}` but this {} uses `{stored_engine}`; \
remove the `?engine=` parameter from the remote URL",
container_word(*kind)
)]
EngineMismatch {
kind: BackendKind,
url_engine: StorageEngine,
stored_engine: StorageEngine,
},
}
const fn container_word(kind: BackendKind) -> &'static str {
match kind {
BackendKind::S3 => "bucket",
BackendKind::Azure => "container",
}
}
#[must_use]
pub fn fatal_message(err: &BackendError) -> String {
let mut msg = format!("fatal: {err}");
super::append_source_chain(&mut msg, err);
msg
}
fn classify(
kind: BackendKind,
name: &str,
action: &'static str,
err: ObjectStoreError,
) -> BackendError {
match err {
ObjectStoreError::NotFound(_) => BackendError::BucketNotFound {
kind,
name: name.to_owned(),
},
ObjectStoreError::AccessDenied(_) => BackendError::NotAuthorized {
kind,
action: action.to_owned(),
name: name.to_owned(),
},
ObjectStoreError::Network(inner) => BackendError::Network { source: inner },
other => BackendError::InvalidCredentials { source: other },
}
}
pub async fn validate_format(
kind: BackendKind,
store: &dyn ObjectStore,
prefix: &str,
url_engine: Option<StorageEngine>,
) -> Result<StorageEngine, BackendError> {
let format_key = keys::join(Some(prefix), "FORMAT");
let bytes = match store.get_bytes(&format_key).await {
Ok(b) => b,
Err(ObjectStoreError::NotFound(_)) => {
return Ok(url_engine.unwrap_or(StorageEngine::Bundle));
}
Err(ObjectStoreError::Network(inner)) => {
return Err(BackendError::Network { source: inner });
}
Err(e) => return Err(BackendError::InvalidCredentials { source: e }),
};
let stored_name =
std::str::from_utf8(&bytes).map_err(|_| BackendError::InvalidCredentials {
source: ObjectStoreError::Other(Box::new(std::io::Error::other(
"FORMAT key contains non-UTF-8 bytes",
))),
})?;
let stored_name = stored_name.trim();
let stored_engine =
StorageEngine::from_name(stored_name).ok_or_else(|| BackendError::UnknownStoredEngine {
kind,
stored: stored_name.to_owned(),
})?;
if let Some(url_engine) = url_engine
&& url_engine != stored_engine
{
return Err(BackendError::EngineMismatch {
kind,
url_engine,
stored_engine,
});
}
Ok(stored_engine)
}
pub async fn build(
remote: &RemoteUrl,
) -> Result<(Arc<dyn ObjectStore>, StorageEngine), BackendError> {
let prefix = remote.prefix().unwrap_or_default();
let url_engine = remote.flags().engine;
let store: Arc<dyn ObjectStore> = match remote {
RemoteUrl::S3 { bucket, .. } => {
let store = S3Store::from_remote_url(remote)
.await
.map_err(|e| classify(BackendKind::S3, bucket, "ListObjectsV2", e))?;
store
.probe(prefix)
.await
.map_err(|e| classify(BackendKind::S3, bucket, "ListObjectsV2", e))?;
Arc::new(store)
}
RemoteUrl::Azure { container, .. } => {
let store = AzureStore::from_remote_url(remote)
.await
.map_err(|e| classify(BackendKind::Azure, container, "ListBlobs", e))?;
store
.probe(prefix)
.await
.map_err(|e| classify(BackendKind::Azure, container, "ListBlobs", e))?;
Arc::new(store)
}
};
let engine = validate_format(remote.kind(), store.as_ref(), prefix, url_engine).await?;
Ok((store, engine))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::mock::MockStore;
use bytes::Bytes;
fn boxed(message: &str) -> crate::object_store::BoxError {
Box::new(std::io::Error::other(message.to_string()))
}
#[test]
fn classify_maps_not_found_to_bucket_not_found_for_s3() {
let err = classify(
BackendKind::S3,
"mybucket",
"ListObjectsV2",
ObjectStoreError::NotFound("mybucket".into()),
);
assert!(matches!(
err,
BackendError::BucketNotFound {
kind: BackendKind::S3,
ref name
} if name == "mybucket"
));
}
#[test]
fn classify_maps_not_found_to_bucket_not_found_for_azure() {
let err = classify(
BackendKind::Azure,
"mycontainer",
"ListBlobs",
ObjectStoreError::NotFound("mycontainer".into()),
);
assert!(matches!(
err,
BackendError::BucketNotFound {
kind: BackendKind::Azure,
ref name
} if name == "mycontainer"
));
}
#[test]
fn classify_maps_access_denied_to_not_authorized() {
let err = classify(
BackendKind::S3,
"mybucket",
"ListObjectsV2",
ObjectStoreError::AccessDenied("mybucket".into()),
);
let BackendError::NotAuthorized { kind, action, name } = err else {
panic!("expected NotAuthorized");
};
assert_eq!(kind, BackendKind::S3);
assert_eq!(action, "ListObjectsV2");
assert_eq!(name, "mybucket");
}
#[test]
fn classify_maps_network_to_network_error() {
let err = classify(
BackendKind::S3,
"mybucket",
"ListObjectsV2",
ObjectStoreError::Network(boxed("dns failure")),
);
let BackendError::Network { source } = err else {
panic!("expected Network, got {err:?}");
};
assert_eq!(source.to_string(), "dns failure");
}
#[test]
fn classify_maps_other_to_invalid_credentials() {
let err = classify(
BackendKind::Azure,
"mycontainer",
"ListBlobs",
ObjectStoreError::Other(boxed("missing AZ_CRED env var")),
);
let BackendError::InvalidCredentials { source } = err else {
panic!("expected InvalidCredentials");
};
assert_eq!(source.to_string(), "missing AZ_CRED env var");
}
#[test]
fn classify_maps_precondition_failed_to_invalid_credentials() {
let err = classify(
BackendKind::S3,
"mybucket",
"ListObjectsV2",
ObjectStoreError::PreconditionFailed("mybucket".into()),
);
assert!(matches!(err, BackendError::InvalidCredentials { .. }));
}
#[test]
fn fatal_message_s3_bucket_not_found_renders_expected_wording() {
let err = BackendError::BucketNotFound {
kind: BackendKind::S3,
name: "mybucket".into(),
};
assert_eq!(fatal_message(&err), "fatal: bucket not found mybucket");
}
#[test]
fn fatal_message_azure_container_not_found() {
let err = BackendError::BucketNotFound {
kind: BackendKind::Azure,
name: "mycontainer".into(),
};
assert_eq!(
fatal_message(&err),
"fatal: container not found mycontainer"
);
}
#[test]
fn fatal_message_not_authorized_renders_expected_wording() {
let err = BackendError::NotAuthorized {
kind: BackendKind::S3,
action: "ListObjectsV2".into(),
name: "mybucket".into(),
};
assert_eq!(
fatal_message(&err),
"fatal: user not authorized to perform ListObjectsV2 on bucket mybucket"
);
}
#[test]
fn fatal_message_azure_not_authorized_uses_container_word() {
let err = BackendError::NotAuthorized {
kind: BackendKind::Azure,
action: "ListBlobs".into(),
name: "mycontainer".into(),
};
let fatal = fatal_message(&err);
assert_eq!(
fatal,
"fatal: user not authorized to perform ListBlobs on container mycontainer"
);
assert!(
!fatal.contains("bucket"),
"Azure path leaked 'bucket': {fatal}"
);
}
#[test]
fn fatal_message_invalid_credentials_appends_source() {
let err = BackendError::InvalidCredentials {
source: ObjectStoreError::Other(boxed("credential acquisition failed")),
};
assert_eq!(
fatal_message(&err),
"fatal: invalid credentials credential acquisition failed"
);
}
#[test]
fn fatal_message_network_includes_root_cause() {
let err = BackendError::Network {
source: boxed("dns lookup failed"),
};
assert_eq!(
fatal_message(&err),
"fatal: connection error: dns lookup failed"
);
}
#[test]
fn fatal_message_walks_full_chain() {
use std::error::Error as StdError;
use std::fmt;
#[derive(Debug)]
struct WrappedError {
msg: &'static str,
inner: Box<dyn StdError + Send + Sync + 'static>,
}
impl fmt::Display for WrappedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.msg)
}
}
impl StdError for WrappedError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
Some(self.inner.as_ref())
}
}
let err = BackendError::Network {
source: Box::new(WrappedError {
msg: "dispatch failure",
inner: boxed("connection refused"),
}),
};
assert_eq!(
fatal_message(&err),
"fatal: connection error: dispatch failure: connection refused"
);
}
#[test]
fn fatal_message_engine_mismatch() {
let url_engine = StorageEngine::Packchain;
let stored_engine = StorageEngine::Bundle;
let err = BackendError::EngineMismatch {
kind: BackendKind::S3,
url_engine,
stored_engine,
};
let expected = "\
fatal: URL specifies engine `packchain` but this bucket uses `bundle`; \
remove the `?engine=` parameter from the remote URL";
assert_eq!(fatal_message(&err), expected);
}
#[test]
fn fatal_message_azure_engine_mismatch_uses_container_word() {
let err = BackendError::EngineMismatch {
kind: BackendKind::Azure,
url_engine: StorageEngine::Packchain,
stored_engine: StorageEngine::Bundle,
};
let fatal = fatal_message(&err);
let expected = "\
fatal: URL specifies engine `packchain` but this container uses `bundle`; \
remove the `?engine=` parameter from the remote URL";
assert_eq!(fatal, expected);
assert!(
!fatal.contains("bucket"),
"Azure path leaked 'bucket': {fatal}"
);
}
#[tokio::test]
async fn validate_format_passes_when_key_absent() {
let store = MockStore::new();
assert_eq!(
validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap(),
StorageEngine::Bundle,
);
assert_eq!(
validate_format(BackendKind::S3, &store, "my-repo", None)
.await
.unwrap(),
StorageEngine::Bundle,
);
assert_eq!(
validate_format(BackendKind::S3, &store, "", Some(StorageEngine::Packchain))
.await
.unwrap(),
StorageEngine::Packchain,
);
}
#[tokio::test]
async fn validate_format_passes_when_stored_engine_matches_url() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"bundle"));
assert_eq!(
validate_format(BackendKind::S3, &store, "", Some(StorageEngine::Bundle))
.await
.unwrap(),
StorageEngine::Bundle,
);
}
#[tokio::test]
async fn validate_format_passes_when_no_url_engine_declared() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"bundle"));
assert_eq!(
validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap(),
StorageEngine::Bundle,
);
}
#[tokio::test]
async fn validate_format_passes_when_key_has_trailing_newline() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"bundle\n"));
assert_eq!(
validate_format(BackendKind::S3, &store, "", Some(StorageEngine::Bundle))
.await
.unwrap(),
StorageEngine::Bundle,
);
}
#[tokio::test]
async fn validate_format_rejects_url_packchain_against_stored_bundle() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"bundle"));
let err = validate_format(BackendKind::S3, &store, "", Some(StorageEngine::Packchain))
.await
.unwrap_err();
assert!(
matches!(
err,
BackendError::EngineMismatch {
kind: BackendKind::S3,
url_engine: StorageEngine::Packchain,
stored_engine: StorageEngine::Bundle,
}
),
"expected EngineMismatch(url=packchain, stored=bundle), got {err:?}",
);
}
#[tokio::test]
async fn validate_format_rejects_url_bundle_against_stored_packchain() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"packchain"));
let err = validate_format(BackendKind::S3, &store, "", Some(StorageEngine::Bundle))
.await
.unwrap_err();
assert!(
matches!(
err,
BackendError::EngineMismatch {
kind: BackendKind::S3,
url_engine: StorageEngine::Bundle,
stored_engine: StorageEngine::Packchain,
}
),
"expected EngineMismatch(url=bundle, stored=packchain), got {err:?}",
);
}
#[tokio::test]
async fn validate_format_propagates_azure_kind_into_engine_mismatch() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"bundle"));
let err = validate_format(
BackendKind::Azure,
&store,
"",
Some(StorageEngine::Packchain),
)
.await
.unwrap_err();
let BackendError::EngineMismatch { kind, .. } = &err else {
panic!("expected EngineMismatch, got {err:?}");
};
assert_eq!(*kind, BackendKind::Azure);
let fatal = fatal_message(&err);
assert!(
fatal.contains("container") && !fatal.contains("bucket"),
"Azure EngineMismatch must use 'container', got `{fatal}`",
);
}
#[tokio::test]
async fn validate_format_passes_stored_packchain_with_no_url_engine() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"packchain"));
assert_eq!(
validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap(),
StorageEngine::Packchain,
);
}
#[tokio::test]
async fn validate_format_passes_stored_packchain_with_matching_url() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"packchain"));
assert_eq!(
validate_format(BackendKind::S3, &store, "", Some(StorageEngine::Packchain))
.await
.unwrap(),
StorageEngine::Packchain,
);
}
#[tokio::test]
async fn validate_format_rejects_unknown_stored_engine() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"pack"));
let err = validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap_err();
assert!(
matches!(
err,
BackendError::UnknownStoredEngine { kind: BackendKind::S3, ref stored }
if stored == "pack"
),
"expected UnknownStoredEngine(pack), got {err:?}",
);
}
#[tokio::test]
async fn validate_format_propagates_azure_kind_into_unknown_stored_engine() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"pack"));
let err = validate_format(BackendKind::Azure, &store, "", None)
.await
.unwrap_err();
let BackendError::UnknownStoredEngine { kind, stored } = &err else {
panic!("expected UnknownStoredEngine, got {err:?}");
};
assert_eq!(*kind, BackendKind::Azure);
assert_eq!(stored, "pack");
let fatal = fatal_message(&err);
assert!(
fatal.contains("container uses unknown storage engine"),
"Azure UnknownStoredEngine must use 'container', got `{fatal}`",
);
assert!(
!fatal.contains("bucket"),
"Azure path leaked 'bucket': `{fatal}`",
);
}
#[tokio::test]
async fn validate_format_uses_prefix_for_key_lookup() {
let store = MockStore::new();
store.insert("my-repo/FORMAT", Bytes::from_static(b"bundle"));
store.insert(
"FORMAT",
Bytes::from_static(b"INVALID_SENTINEL_NEVER_AN_ENGINE"),
);
let err = validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap_err();
assert!(
matches!(
err,
BackendError::UnknownStoredEngine { kind: BackendKind::S3, ref stored }
if stored == "INVALID_SENTINEL_NEVER_AN_ENGINE"
),
"expected UnknownStoredEngine(INVALID_SENTINEL_NEVER_AN_ENGINE), got {err:?}",
);
validate_format(BackendKind::S3, &store, "my-repo", None)
.await
.unwrap();
}
#[tokio::test]
async fn validate_format_rejects_non_utf8_format_bytes() {
let store = MockStore::new();
store.insert("FORMAT", Bytes::from_static(b"\xff\xff\xff"));
let err = validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap_err();
let BackendError::InvalidCredentials { source } = &err else {
panic!("expected InvalidCredentials, got {err:?}");
};
let ObjectStoreError::Other(inner) = source else {
panic!("expected Other inside InvalidCredentials, got {source:?}");
};
let msg = inner.to_string();
assert!(
msg.contains("non-UTF-8") && msg.contains("FORMAT"),
"expected message naming the FORMAT key and non-UTF-8 cause, got `{msg}`",
);
let fatal = fatal_message(&err);
assert!(
fatal.contains("invalid credentials") && fatal.contains("non-UTF-8"),
"fatal_message must surface variant + non-UTF-8 source, got `{fatal}`",
);
}
#[test]
fn unknown_stored_engine_error_message() {
let err = BackendError::UnknownStoredEngine {
kind: BackendKind::S3,
stored: "pack".into(),
};
let fatal = fatal_message(&err);
assert!(
fatal.starts_with("fatal: bucket uses unknown storage engine `pack`;"),
"missing prefix in {fatal}",
);
for engine in StorageEngine::ALL {
assert!(
fatal.contains(&format!("`{}`", engine.as_str())),
"fatal_message for UnknownStoredEngine must mention engine `{}`, got `{fatal}`",
engine.as_str(),
);
}
}
#[test]
fn unknown_stored_engine_error_message_azure_uses_container_word() {
let err = BackendError::UnknownStoredEngine {
kind: BackendKind::Azure,
stored: "pack".into(),
};
let fatal = fatal_message(&err);
assert!(
fatal.starts_with("fatal: container uses unknown storage engine `pack`;"),
"Azure UnknownStoredEngine must use 'container', got `{fatal}`",
);
assert!(
!fatal.contains("bucket"),
"Azure path leaked 'bucket': {fatal}"
);
}
#[tokio::test]
async fn validate_format_returns_network_error_on_transport_failure() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
store.arm(Fault::NetworkOnGetBytes {
key: "FORMAT".into(),
});
let err = validate_format(BackendKind::S3, &store, "", None)
.await
.unwrap_err();
assert!(
matches!(err, BackendError::Network { .. }),
"expected Network, got {err:?}",
);
}
}