use alloc::vec::Vec;
use crate::extended_types::AmqpExtValue;
use crate::performatives::{decode_performative, encode_performative};
use crate::types::TypeError;
pub mod descriptor {
pub const HEADER: u64 = 0x0000_0000_0000_0070;
pub const DELIVERY_ANNOTATIONS: u64 = 0x0000_0000_0000_0071;
pub const MESSAGE_ANNOTATIONS: u64 = 0x0000_0000_0000_0072;
pub const PROPERTIES: u64 = 0x0000_0000_0000_0073;
pub const APPLICATION_PROPERTIES: u64 = 0x0000_0000_0000_0074;
pub const DATA: u64 = 0x0000_0000_0000_0075;
pub const AMQP_SEQUENCE: u64 = 0x0000_0000_0000_0076;
pub const AMQP_VALUE: u64 = 0x0000_0000_0000_0077;
pub const FOOTER: u64 = 0x0000_0000_0000_0078;
}
#[derive(Debug, Clone, PartialEq)]
pub enum MessageSection {
Header(AmqpExtValue),
DeliveryAnnotations(AmqpExtValue),
MessageAnnotations(AmqpExtValue),
Properties(AmqpExtValue),
ApplicationProperties(AmqpExtValue),
Data(Vec<u8>),
AmqpSequence(Vec<AmqpExtValue>),
AmqpValue(AmqpExtValue),
Footer(AmqpExtValue),
}
impl MessageSection {
pub fn encode(&self) -> Result<Vec<u8>, TypeError> {
let (desc, body) = match self {
Self::Header(v) => (descriptor::HEADER, v.clone()),
Self::DeliveryAnnotations(v) => (descriptor::DELIVERY_ANNOTATIONS, v.clone()),
Self::MessageAnnotations(v) => (descriptor::MESSAGE_ANNOTATIONS, v.clone()),
Self::Properties(v) => (descriptor::PROPERTIES, v.clone()),
Self::ApplicationProperties(v) => (descriptor::APPLICATION_PROPERTIES, v.clone()),
Self::Data(b) => (descriptor::DATA, AmqpExtValue::Binary(b.clone())),
Self::AmqpSequence(items) => {
(descriptor::AMQP_SEQUENCE, AmqpExtValue::List(items.clone()))
}
Self::AmqpValue(v) => (descriptor::AMQP_VALUE, v.clone()),
Self::Footer(v) => (descriptor::FOOTER, v.clone()),
};
encode_performative(desc, &body)
}
pub fn decode(bytes: &[u8]) -> Result<(Self, usize), TypeError> {
let (desc, body, n) = decode_performative(bytes)?;
let s = match desc {
descriptor::HEADER => Self::Header(body),
descriptor::DELIVERY_ANNOTATIONS => Self::DeliveryAnnotations(body),
descriptor::MESSAGE_ANNOTATIONS => Self::MessageAnnotations(body),
descriptor::PROPERTIES => Self::Properties(body),
descriptor::APPLICATION_PROPERTIES => Self::ApplicationProperties(body),
descriptor::DATA => match body {
AmqpExtValue::Binary(b) => Self::Data(b),
_ => return Err(TypeError::UnsupportedFormatCode(0)),
},
descriptor::AMQP_SEQUENCE => match body {
AmqpExtValue::List(items) => Self::AmqpSequence(items),
_ => return Err(TypeError::UnsupportedFormatCode(0)),
},
descriptor::AMQP_VALUE => Self::AmqpValue(body),
descriptor::FOOTER => Self::Footer(body),
_ => return Err(TypeError::UnsupportedFormatCode(0)),
};
Ok((s, n))
}
#[must_use]
pub fn order(&self) -> u8 {
match self {
Self::Header(_) => 0,
Self::DeliveryAnnotations(_) => 1,
Self::MessageAnnotations(_) => 2,
Self::Properties(_) => 3,
Self::ApplicationProperties(_) => 4,
Self::Data(_) | Self::AmqpSequence(_) | Self::AmqpValue(_) => 5,
Self::Footer(_) => 6,
}
}
}
pub fn validate_section_sequence(sections: &[MessageSection]) -> Result<(), TypeError> {
let mut last_order = 0;
for s in sections {
let o = s.order();
if o < last_order {
return Err(TypeError::Truncated);
}
last_order = o;
}
Ok(())
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use alloc::string::ToString;
fn round(s: MessageSection) {
let bytes = s.encode().expect("encode");
let (parsed, _) = MessageSection::decode(&bytes).expect("decode");
assert_eq!(parsed, s);
}
#[test]
fn header_section_round_trips() {
round(MessageSection::Header(AmqpExtValue::List(alloc::vec![
AmqpExtValue::Boolean(true), ])));
}
#[test]
fn properties_section_round_trips_with_subject() {
round(MessageSection::Properties(AmqpExtValue::List(alloc::vec![
AmqpExtValue::Null, AmqpExtValue::Null, AmqpExtValue::Null, AmqpExtValue::Str("TrackingResult".to_string()), ])));
}
#[test]
fn application_properties_round_trips() {
round(MessageSection::ApplicationProperties(AmqpExtValue::Map(
alloc::vec![(
AmqpExtValue::Str("dds:domain-id".to_string()),
AmqpExtValue::Int(0),
)],
)));
}
#[test]
fn data_section_round_trips_binary() {
round(MessageSection::Data(alloc::vec![0xDE, 0xAD, 0xBE, 0xEF]));
}
#[test]
fn amqp_value_section_round_trips() {
round(MessageSection::AmqpValue(AmqpExtValue::Str(
"hello".to_string(),
)));
}
#[test]
fn amqp_sequence_round_trips() {
round(MessageSection::AmqpSequence(alloc::vec![
AmqpExtValue::Int(1),
AmqpExtValue::Int(2),
AmqpExtValue::Int(3),
]));
}
#[test]
fn footer_section_round_trips() {
round(MessageSection::Footer(AmqpExtValue::Map(Vec::new())));
}
#[test]
fn canonical_order_passes_validator() {
let seq = alloc::vec![
MessageSection::Header(AmqpExtValue::Null),
MessageSection::Properties(AmqpExtValue::Null),
MessageSection::Data(alloc::vec![0]),
MessageSection::Footer(AmqpExtValue::Null),
];
assert!(validate_section_sequence(&seq).is_ok());
}
#[test]
fn out_of_order_fails_validator() {
let seq = alloc::vec![
MessageSection::Properties(AmqpExtValue::Null),
MessageSection::Header(AmqpExtValue::Null),
];
assert!(validate_section_sequence(&seq).is_err());
}
#[test]
fn all_seven_sections_have_unique_order_indexes() {
let order_set: alloc::collections::BTreeSet<_> = alloc::vec![
MessageSection::Header(AmqpExtValue::Null).order(),
MessageSection::DeliveryAnnotations(AmqpExtValue::Null).order(),
MessageSection::MessageAnnotations(AmqpExtValue::Null).order(),
MessageSection::Properties(AmqpExtValue::Null).order(),
MessageSection::ApplicationProperties(AmqpExtValue::Null).order(),
MessageSection::Data(Vec::new()).order(),
MessageSection::Footer(AmqpExtValue::Null).order(),
]
.into_iter()
.collect();
assert_eq!(order_set.len(), 7);
}
}