use alloc::vec::Vec;
use crate::extended_types::AmqpExtValue;
use crate::types::{TypeError, codes};
pub mod descriptor {
pub const OPEN: u64 = 0x0000_0000_0000_0010;
pub const BEGIN: u64 = 0x0000_0000_0000_0011;
pub const ATTACH: u64 = 0x0000_0000_0000_0012;
pub const FLOW: u64 = 0x0000_0000_0000_0013;
pub const TRANSFER: u64 = 0x0000_0000_0000_0014;
pub const DISPOSITION: u64 = 0x0000_0000_0000_0015;
pub const DETACH: u64 = 0x0000_0000_0000_0016;
pub const END: u64 = 0x0000_0000_0000_0017;
pub const CLOSE: u64 = 0x0000_0000_0000_0018;
}
const DESCRIBED_PREFIX: u8 = 0x00;
pub fn encode_performative(descriptor: u64, body: &AmqpExtValue) -> Result<Vec<u8>, TypeError> {
let mut out = alloc::vec![DESCRIBED_PREFIX];
out.extend_from_slice(&AmqpExtValue::Ulong(descriptor).encode()?);
out.extend_from_slice(&body.encode()?);
Ok(out)
}
pub fn decode_performative(bytes: &[u8]) -> Result<(u64, AmqpExtValue, usize), TypeError> {
if bytes.is_empty() || bytes[0] != DESCRIBED_PREFIX {
return Err(TypeError::Truncated);
}
let mut cur = 1;
let (desc, n) = AmqpExtValue::decode(&bytes[cur..])?;
cur += n;
let descriptor = match desc {
AmqpExtValue::Ulong(u) => u,
_ => return Err(TypeError::UnsupportedFormatCode(codes::ULONG)),
};
let (body, n) = AmqpExtValue::decode(&bytes[cur..])?;
cur += n;
Ok((descriptor, body, cur))
}
pub fn open(container_id: &str) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
AmqpExtValue::Str(container_id.into()),
AmqpExtValue::Null, AmqpExtValue::Null, AmqpExtValue::Null, AmqpExtValue::Null, ]);
encode_performative(descriptor::OPEN, &body)
}
pub fn begin(
remote_channel: Option<u16>,
next_outgoing_id: u32,
incoming_window: u32,
outgoing_window: u32,
) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
match remote_channel {
Some(c) => AmqpExtValue::Ushort(c),
None => AmqpExtValue::Null,
},
AmqpExtValue::Uint(next_outgoing_id),
AmqpExtValue::Uint(incoming_window),
AmqpExtValue::Uint(outgoing_window),
]);
encode_performative(descriptor::BEGIN, &body)
}
pub fn attach(name: &str, handle: u32, is_sender: bool) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
AmqpExtValue::Str(name.into()),
AmqpExtValue::Uint(handle),
AmqpExtValue::Boolean(!is_sender), ]);
encode_performative(descriptor::ATTACH, &body)
}
pub fn flow(
incoming_window: u32,
next_outgoing_id: u32,
outgoing_window: u32,
) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
AmqpExtValue::Null, AmqpExtValue::Uint(incoming_window),
AmqpExtValue::Uint(next_outgoing_id),
AmqpExtValue::Uint(outgoing_window),
]);
encode_performative(descriptor::FLOW, &body)
}
pub fn transfer(
handle: u32,
delivery_id: Option<u32>,
settled: bool,
) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
AmqpExtValue::Uint(handle),
match delivery_id {
Some(d) => AmqpExtValue::Uint(d),
None => AmqpExtValue::Null,
},
AmqpExtValue::Null, AmqpExtValue::Null, AmqpExtValue::Boolean(settled),
]);
encode_performative(descriptor::TRANSFER, &body)
}
pub fn disposition(
is_receiver: bool,
first: u32,
last: Option<u32>,
settled: bool,
) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
AmqpExtValue::Boolean(is_receiver),
AmqpExtValue::Uint(first),
match last {
Some(l) => AmqpExtValue::Uint(l),
None => AmqpExtValue::Null,
},
AmqpExtValue::Boolean(settled),
]);
encode_performative(descriptor::DISPOSITION, &body)
}
pub fn detach(handle: u32, closed: bool) -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![
AmqpExtValue::Uint(handle),
AmqpExtValue::Boolean(closed),
]);
encode_performative(descriptor::DETACH, &body)
}
pub fn end() -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![AmqpExtValue::Null]); encode_performative(descriptor::END, &body)
}
pub fn close() -> Result<Vec<u8>, TypeError> {
let body = AmqpExtValue::List(alloc::vec![AmqpExtValue::Null]); encode_performative(descriptor::CLOSE, &body)
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn round(d: u64, body: AmqpExtValue) {
let bytes = encode_performative(d, &body).expect("encode");
let (desc, parsed, _) = decode_performative(&bytes).expect("decode");
assert_eq!(desc, d);
assert_eq!(parsed, body);
}
#[test]
fn open_descriptor_is_0x10() {
let bytes = open("client-1").expect("encode");
let (desc, _, _) = decode_performative(&bytes).expect("decode");
assert_eq!(desc, descriptor::OPEN);
}
#[test]
fn begin_descriptor_is_0x11() {
let bytes = begin(None, 0, 100, 100).expect("encode");
let (desc, _, _) = decode_performative(&bytes).expect("decode");
assert_eq!(desc, descriptor::BEGIN);
}
#[test]
fn attach_carries_role_bit() {
let bytes = attach("link-1", 0, true).expect("encode"); let (_desc, body, _) = decode_performative(&bytes).expect("decode");
if let AmqpExtValue::List(items) = body {
assert_eq!(items[2], AmqpExtValue::Boolean(false)); } else {
panic!("expected list body");
}
}
#[test]
fn flow_carries_window_values() {
let bytes = flow(50, 7, 100).expect("encode");
let (_, body, _) = decode_performative(&bytes).expect("decode");
if let AmqpExtValue::List(items) = body {
assert_eq!(items[1], AmqpExtValue::Uint(50));
} else {
panic!();
}
}
#[test]
fn transfer_default_settled_false() {
let bytes = transfer(0, None, false).expect("encode");
let (desc, _, _) = decode_performative(&bytes).expect("decode");
assert_eq!(desc, descriptor::TRANSFER);
}
#[test]
fn disposition_settled_round_trip() {
let bytes = disposition(true, 0, Some(10), true).expect("encode");
let (desc, _, _) = decode_performative(&bytes).expect("decode");
assert_eq!(desc, descriptor::DISPOSITION);
}
#[test]
fn detach_round_trips() {
let bytes = detach(7, true).expect("encode");
let (desc, _, _) = decode_performative(&bytes).expect("decode");
assert_eq!(desc, descriptor::DETACH);
}
#[test]
fn end_and_close_are_minimal() {
let e = end().expect("encode");
assert_eq!(decode_performative(&e).expect("decode").0, descriptor::END);
let c = close().expect("encode");
assert_eq!(
decode_performative(&c).expect("decode").0,
descriptor::CLOSE
);
}
#[test]
fn arbitrary_descriptor_round_trip() {
round(
0xDEAD_BEEF,
AmqpExtValue::List(alloc::vec![AmqpExtValue::Int(42)]),
);
}
#[test]
fn truncated_input_yields_error() {
assert!(decode_performative(&[]).is_err());
assert!(decode_performative(&[0xFF, 0x00]).is_err());
}
#[test]
fn all_nine_descriptors_have_unique_values() {
let descs = alloc::vec![
descriptor::OPEN,
descriptor::BEGIN,
descriptor::ATTACH,
descriptor::FLOW,
descriptor::TRANSFER,
descriptor::DISPOSITION,
descriptor::DETACH,
descriptor::END,
descriptor::CLOSE,
];
let mut sorted = descs.clone();
sorted.sort_unstable();
sorted.dedup();
assert_eq!(sorted.len(), descs.len(), "descriptors must be unique");
}
}