use alloc::string::String;
use core::fmt;
use crate::mapping::MappingError;
use crate::routing::ResolutionError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorScope {
Transfer,
Link,
Connection,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AmqpErrorCondition {
DecodeError,
NotImplemented,
NotFound,
ResourceLimitExceeded,
UnauthorizedAccess,
PreconditionFailed,
FramingError,
}
impl AmqpErrorCondition {
#[must_use]
pub const fn as_symbol(self) -> &'static str {
match self {
Self::DecodeError => "amqp:decode-error",
Self::NotImplemented => "amqp:not-implemented",
Self::NotFound => "amqp:not-found",
Self::ResourceLimitExceeded => "amqp:resource-limit-exceeded",
Self::UnauthorizedAccess => "amqp:unauthorized-access",
Self::PreconditionFailed => "amqp:precondition-failed",
Self::FramingError => "amqp:connection:framing-error",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ErrorDescription {
pub spec_section: String,
pub message: String,
}
impl ErrorDescription {
pub fn new(spec_section: impl Into<String>, message: impl Into<String>) -> Self {
Self {
spec_section: spec_section.into(),
message: message.into(),
}
}
#[must_use]
pub fn render(&self) -> String {
alloc::format!("{}: {}", self.spec_section, self.message)
}
}
impl fmt::Display for ErrorDescription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}: {}", self.spec_section, self.message)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AmqpError {
pub condition: AmqpErrorCondition,
pub scope: ErrorScope,
pub description: ErrorDescription,
}
impl AmqpError {
pub fn new(
condition: AmqpErrorCondition,
scope: ErrorScope,
description: ErrorDescription,
) -> Self {
Self {
condition,
scope,
description,
}
}
}
#[must_use]
pub fn map_mapping_error(err: &MappingError) -> AmqpError {
match err {
MappingError::InvalidUtf8 => AmqpError::new(
AmqpErrorCondition::DecodeError,
ErrorScope::Transfer,
ErrorDescription::new("§11.1", "JSON body is not valid UTF-8"),
),
MappingError::InvalidJson(msg) => AmqpError::new(
AmqpErrorCondition::DecodeError,
ErrorScope::Transfer,
ErrorDescription::new("§11.1", alloc::format!("JSON body parse error: {msg}")),
),
MappingError::EmptyBody => AmqpError::new(
AmqpErrorCondition::DecodeError,
ErrorScope::Transfer,
ErrorDescription::new("§11.1", "body section is empty"),
),
}
}
#[must_use]
pub fn map_resolution_error(err: &ResolutionError, permit_dynamic_topics: bool) -> AmqpError {
match err {
ResolutionError::NoRoute(addr) => {
if permit_dynamic_topics {
AmqpError::new(
AmqpErrorCondition::NotFound,
ErrorScope::Link,
ErrorDescription::new(
"§7.5.1",
alloc::format!(
"address '{addr}' not in catalog (dynamic-topic creation enabled)"
),
),
)
} else {
AmqpError::new(
AmqpErrorCondition::NotFound,
ErrorScope::Link,
ErrorDescription::new(
"§7.5.1",
alloc::format!(
"address '{addr}' not in catalog and permit_dynamic_topics = false"
),
),
)
}
}
ResolutionError::Malformed(addr) => AmqpError::new(
AmqpErrorCondition::DecodeError,
ErrorScope::Link,
ErrorDescription::new("§7.3", alloc::format!("malformed AMQP address '{addr}'")),
),
}
}
#[must_use]
pub fn resource_limit_exceeded(
spec_section: impl Into<String>,
message: impl Into<String>,
) -> AmqpError {
AmqpError::new(
AmqpErrorCondition::ResourceLimitExceeded,
ErrorScope::Connection,
ErrorDescription::new(spec_section, message),
)
}
#[must_use]
pub fn unsettled_state_not_implemented() -> AmqpError {
AmqpError::new(
AmqpErrorCondition::NotImplemented,
ErrorScope::Link,
ErrorDescription::new(
"§7.4.2",
"terminus.durable = unsettled-state requires broker functionality (out of scope)",
),
)
}
#[must_use]
pub fn unknown_dds_operation(value: &str) -> AmqpError {
AmqpError::new(
AmqpErrorCondition::NotImplemented,
ErrorScope::Link,
ErrorDescription::new(
"§7.7",
alloc::format!("unknown dds:operation value '{value}'"),
),
)
}
#[must_use]
pub fn instance_unknown(op: &str, key: &str) -> AmqpError {
AmqpError::new(
AmqpErrorCondition::PreconditionFailed,
ErrorScope::Transfer,
ErrorDescription::new(
"§11.3",
alloc::format!("dds:operation = {op} on unknown instance key '{key}'"),
),
)
}
#[must_use]
pub fn register_missing_key() -> AmqpError {
AmqpError::new(
AmqpErrorCondition::DecodeError,
ErrorScope::Transfer,
ErrorDescription::new(
"§11.3",
"dds:operation = register but body lacks key fields",
),
)
}
#[must_use]
pub fn access_denied(subject: &str, address: &str) -> AmqpError {
AmqpError::new(
AmqpErrorCondition::UnauthorizedAccess,
ErrorScope::Link,
ErrorDescription::new(
"§10.3.3",
alloc::format!("subject '{subject}' denied access to '{address}'"),
),
)
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn condition_symbols_match_spec() {
assert_eq!(
AmqpErrorCondition::DecodeError.as_symbol(),
"amqp:decode-error"
);
assert_eq!(
AmqpErrorCondition::NotImplemented.as_symbol(),
"amqp:not-implemented"
);
assert_eq!(AmqpErrorCondition::NotFound.as_symbol(), "amqp:not-found");
assert_eq!(
AmqpErrorCondition::ResourceLimitExceeded.as_symbol(),
"amqp:resource-limit-exceeded"
);
assert_eq!(
AmqpErrorCondition::UnauthorizedAccess.as_symbol(),
"amqp:unauthorized-access"
);
assert_eq!(
AmqpErrorCondition::PreconditionFailed.as_symbol(),
"amqp:precondition-failed"
);
assert_eq!(
AmqpErrorCondition::FramingError.as_symbol(),
"amqp:connection:framing-error"
);
}
#[test]
fn description_renders_spec_section_then_text() {
let d = ErrorDescription::new("§7.2.1.3", "type-id collision detected");
assert_eq!(d.render(), "§7.2.1.3: type-id collision detected");
}
#[test]
fn description_display_matches_render() {
let d = ErrorDescription::new("§D.4.1", "correlation-id absent");
assert_eq!(alloc::format!("{d}"), d.render());
}
#[test]
fn invalid_utf8_maps_to_decode_error_transfer() {
let e = map_mapping_error(&MappingError::InvalidUtf8);
assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
assert_eq!(e.scope, ErrorScope::Transfer);
assert!(e.description.spec_section.contains("§11.1"));
}
#[test]
fn invalid_json_maps_to_decode_error() {
let e = map_mapping_error(&MappingError::InvalidJson("bad token".into()));
assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
assert!(e.description.message.contains("bad token"));
}
#[test]
fn empty_body_maps_to_decode_error() {
let e = map_mapping_error(&MappingError::EmptyBody);
assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
}
#[test]
fn no_route_with_dynamic_disabled_yields_not_found_link() {
let e = map_resolution_error(&ResolutionError::NoRoute("X".into()), false);
assert_eq!(e.condition, AmqpErrorCondition::NotFound);
assert_eq!(e.scope, ErrorScope::Link);
assert!(
e.description
.message
.contains("permit_dynamic_topics = false")
);
assert!(e.description.spec_section.contains("§7.5.1"));
}
#[test]
fn no_route_with_dynamic_enabled_still_not_found() {
let e = map_resolution_error(&ResolutionError::NoRoute("X".into()), true);
assert_eq!(e.condition, AmqpErrorCondition::NotFound);
assert!(
e.description
.message
.contains("dynamic-topic creation enabled")
);
}
#[test]
fn malformed_address_maps_to_decode_error() {
let e = map_resolution_error(&ResolutionError::Malformed("bad://".into()), false);
assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
assert_eq!(e.scope, ErrorScope::Link);
}
#[test]
fn resource_limit_exceeded_is_connection_scope() {
let e = resource_limit_exceeded("§7.10", "max-connections cap reached");
assert_eq!(e.condition, AmqpErrorCondition::ResourceLimitExceeded);
assert_eq!(e.scope, ErrorScope::Connection);
}
#[test]
fn unsettled_state_yields_not_implemented() {
let e = unsettled_state_not_implemented();
assert_eq!(e.condition, AmqpErrorCondition::NotImplemented);
assert_eq!(e.scope, ErrorScope::Link);
assert!(e.description.spec_section.contains("§7.4.2"));
}
#[test]
fn unknown_dds_operation_yields_not_implemented() {
let e = unknown_dds_operation("teleport");
assert_eq!(e.condition, AmqpErrorCondition::NotImplemented);
assert!(e.description.message.contains("teleport"));
}
#[test]
fn instance_unknown_yields_precondition_failed() {
let e = instance_unknown("unregister", "key-7");
assert_eq!(e.condition, AmqpErrorCondition::PreconditionFailed);
assert_eq!(e.scope, ErrorScope::Transfer);
assert!(e.description.message.contains("key-7"));
assert!(e.description.message.contains("unregister"));
}
#[test]
fn register_missing_key_yields_decode_error() {
let e = register_missing_key();
assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
assert!(e.description.message.contains("register"));
}
#[test]
fn access_denied_yields_unauthorized_access_link() {
let e = access_denied("CN=eve", "Sensor");
assert_eq!(e.condition, AmqpErrorCondition::UnauthorizedAccess);
assert_eq!(e.scope, ErrorScope::Link);
assert!(e.description.message.contains("CN=eve"));
assert!(e.description.message.contains("Sensor"));
}
}