use amqp_serde::{
from_bytes,
types::{AmqpChannelId, LongUint, Octect, ShortUint},
};
use serde::{Deserialize, Serialize};
use std::fmt;
#[macro_use]
mod helpers {
macro_rules! impl_method_frame {
($name:ident, $class_id:literal, $method_id:literal) => {
impl $name {
pub(crate) fn header() -> &'static MethodHeader {
static __METHOD_HEADER: MethodHeader = MethodHeader::new($class_id, $method_id);
&__METHOD_HEADER
}
pub(crate) fn into_frame(self) -> Frame {
Frame::$name(Self::header(), self)
}
}
};
}
macro_rules! impl_frame {
($($class_id:literal => $($method_id:literal : $method:ident),+);+) => {
fn decode_method_frame(header: MethodHeader, content: &[u8]) -> Result<Frame, Error> {
match header.class_id() {
$($class_id => {
match header.method_id() {
$($method_id => Ok(from_bytes::<$method>(content)?.into_frame()),)+
_ => unimplemented!("unknown method id"),
}
})+
_ => unimplemented!("unknown class id"),
}
}
$($(impl_method_frame!{$method, $class_id, $method_id})+)+
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum Frame {
$($($method(&'static MethodHeader, $method),)+)+
HeartBeat(HeartBeat),
ContentHeader(Box<ContentHeader>),
ContentBody(ContentBody),
PublishCombo(Publish, Box<ContentHeader>, ContentBody),
}
};
}
}
mod constants;
mod content_body;
mod content_header;
mod error;
mod heartbeat;
mod method;
mod protocol_header;
pub use constants::*;
pub use content_body::*;
pub use content_header::*;
pub use error::*;
pub use heartbeat::*;
pub use method::*;
pub use protocol_header::*;
impl_frame! {
10 => 10: Start,
11: StartOk,
20: Secure,
21: SecureOk,
30: Tune,
31: TuneOk,
40: Open,
41: OpenOk,
50: Close,
51: CloseOk,
60: Blocked,
61: Unblocked,
70: UpdateSecret,
71: UpdateSecretOk;
20 => 10: OpenChannel,
11: OpenChannelOk,
20: Flow,
21: FlowOk,
40: CloseChannel,
41: CloseChannelOk;
40 => 10: Declare,
11: DeclareOk,
20: Delete,
21: DeleteOk,
30: Bind,
31: BindOk,
40: Unbind,
51: UnbindOk;
50 => 10: DeclareQueue,
11: DeclareQueueOk,
20: BindQueue,
21: BindQueueOk,
30: PurgeQueue,
31: PurgeQueueOk,
40: DeleteQueue,
41: DeleteQueueOk,
50: UnbindQueue,
51: UnbindQueueOk;
60 => 10: Qos,
11: QosOk,
20: Consume,
21: ConsumeOk,
30: Cancel,
31: CancelOk,
40: Publish,
50: Return,
60: Deliver,
70: Get,
71: GetOk,
72: GetEmpty,
80: Ack,
90: Reject,
110: Recover,
111: RecoverOk,
120: Nack;
85 => 10: Select,
11: SelectOk;
90 => 10: TxSelect,
11: TxSelectOk,
20: TxCommit,
21: TxCommitOk,
30: TxRollback,
31: TxRollbackOk
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct FrameHeader {
pub frame_type: Octect, pub channel: ShortUint,
pub payload_size: LongUint,
}
impl fmt::Display for Frame {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl Frame {
pub fn get_frame_type(&self) -> Octect {
match self {
Frame::HeartBeat(_) => FRAME_HEARTBEAT,
Frame::ContentHeader(_) => FRAME_CONTENT_HEADER,
Frame::ContentBody(_) => FRAME_CONTENT_BODY,
_ => FRAME_METHOD,
}
}
pub fn decode(buf: &[u8]) -> Result<Option<(usize, AmqpChannelId, Frame)>, Error> {
if buf.len() < FRAME_HEADER_SIZE {
return Ok(None);
}
let FrameHeader {
frame_type,
channel,
payload_size,
} = from_bytes(match buf.get(0..FRAME_HEADER_SIZE) {
Some(s) => s,
None => unreachable!("out of bound"),
})?;
let total_size = payload_size as usize + FRAME_HEADER_SIZE + 1;
if total_size > buf.len() {
return Ok(None);
}
match buf.get(total_size - 1) {
Some(v) => {
if &FRAME_END != v {
return Err(Error::Corrupted);
}
}
None => unreachable!("out of bound"),
};
match frame_type {
FRAME_METHOD => {
let header: MethodHeader =
from_bytes(match buf.get(FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + 4) {
Some(s) => s,
None => unreachable!("out of bound"),
})?;
let method_raw = match buf.get(FRAME_HEADER_SIZE + 4..total_size - 1) {
Some(s) => s,
None => unreachable!("out of bound"),
};
let frame = decode_method_frame(header, method_raw)?;
Ok(Some((total_size, channel, frame)))
}
FRAME_HEARTBEAT => Ok(Some((total_size, channel, Frame::HeartBeat(HeartBeat)))),
FRAME_CONTENT_HEADER => {
let mut start = FRAME_HEADER_SIZE;
let mut end = start + 12;
let header_common: ContentHeaderCommon = from_bytes(match buf.get(start..end) {
Some(s) => s,
None => unreachable!("out of bound"),
})?;
start = end;
end = total_size - 1;
let basic_properties: BasicProperties = from_bytes(match buf.get(start..end) {
Some(s) => s,
None => unreachable!("out of bound"),
})?;
Ok(Some((
total_size,
channel,
Frame::ContentHeader(Box::new(ContentHeader::new(
header_common,
basic_properties,
))),
)))
}
FRAME_CONTENT_BODY => {
let start = FRAME_HEADER_SIZE;
let end = total_size - 1;
let body = buf.get(start..end).expect("should never fail");
Ok(Some((
total_size,
channel,
Frame::ContentBody(ContentBody::new(body.to_vec())),
)))
}
_ => Err(Error::Corrupted),
}
}
}