metalmq_codec/frame/
mod.rs

1mod basic;
2mod channel;
3mod connection;
4mod exchange;
5mod queue;
6
7pub use self::{
8    basic::{
9        basic_get_empty, confirm_select, confirm_select_ok, BasicAckArgs, BasicCancelArgs, BasicCancelOkArgs,
10        BasicConsumeArgs, BasicConsumeFlags, BasicConsumeOkArgs, BasicDeliverArgs, BasicGetArgs, BasicGetOkArgs,
11        BasicPublishArgs, BasicPublishFlags, BasicRejectArgs, BasicReturnArgs, ConfirmSelectArgs,
12    },
13    channel::{channel_close, channel_close_ok, channel_open, channel_open_ok, ChannelCloseArgs},
14    connection::{
15        connection_close, connection_close_ok, connection_open_ok, connection_tune, connection_tune_ok,
16        ConnectionCloseArgs, ConnectionOpenArgs, ConnectionStartArgs, ConnectionStartOkArgs, ConnectionTuneArgs,
17        ConnectionTuneOkArgs,
18    },
19    exchange::{
20        exchange_declare_ok, exchange_delete_ok, ExchangeDeclareArgs, ExchangeDeclareFlags, ExchangeDeleteArgs,
21        ExchangeDeleteFlags,
22    },
23    queue::{
24        queue_bind_ok, QueueBindArgs, QueueDeclareArgs, QueueDeclareFlags, QueueDeclareOkArgs, QueueDeleteArgs,
25        QueueDeleteFlags, QueueDeleteOkArgs, QueuePurgeArgs, QueuePurgeOkArgs, QueueUnbindArgs,
26    },
27};
28use std::collections::HashMap;
29
30pub const CONNECTION_START: u32 = 0x000A000A;
31pub const CONNECTION_START_OK: u32 = 0x000A000B;
32pub const CONNECTION_SECRET: u32 = 0x000A0014;
33pub const CONNECTION_SECRET_OK: u32 = 0x000A0015;
34pub const CONNECTION_TUNE: u32 = 0x000A001E;
35pub const CONNECTION_TUNE_OK: u32 = 0x000A001F;
36pub const CONNECTION_OPEN: u32 = 0x000A0028;
37pub const CONNECTION_OPEN_OK: u32 = 0x000A0029;
38pub const CONNECTION_CLOSE: u32 = 0x000A0032;
39pub const CONNECTION_CLOSE_OK: u32 = 0x000A0033;
40
41pub const CHANNEL_OPEN: u32 = 0x0014000A;
42pub const CHANNEL_OPEN_OK: u32 = 0x0014000B;
43pub const CHANNEL_FLOW: u32 = 0x00140014;
44pub const CHANNEL_FLOW_OK: u32 = 0x00140015;
45pub const CHANNEL_CLOSE: u32 = 0x00140028;
46pub const CHANNEL_CLOSE_OK: u32 = 0x00140029;
47
48pub const EXCHANGE_DECLARE: u32 = 0x0028000A;
49pub const EXCHANGE_DECLARE_OK: u32 = 0x0028000B;
50pub const EXCHANGE_DELETE: u32 = 0x00280014;
51pub const EXCHANGE_DELETE_OK: u32 = 0x00280015;
52
53pub const QUEUE_DECLARE: u32 = 0x0032000A;
54pub const QUEUE_DECLARE_OK: u32 = 0x0032000B;
55pub const QUEUE_BIND: u32 = 0x00320014;
56pub const QUEUE_BIND_OK: u32 = 0x00320015;
57pub const QUEUE_PURGE: u32 = 0x0032001E;
58pub const QUEUE_PURGE_OK: u32 = 0x0032001F;
59pub const QUEUE_DELETE: u32 = 0x00320028;
60pub const QUEUE_DELETE_OK: u32 = 0x00320029;
61pub const QUEUE_UNBIND: u32 = 0x00320032;
62pub const QUEUE_UNBIND_OK: u32 = 0x00320033;
63
64pub const BASIC_QOS: u32 = 0x003C000A;
65pub const BASIC_QOS_OK: u32 = 0x003C000B;
66pub const BASIC_CONSUME: u32 = 0x003C0014;
67pub const BASIC_CONSUME_OK: u32 = 0x003C0015;
68pub const BASIC_CANCEL: u32 = 0x003C001E;
69pub const BASIC_CANCEL_OK: u32 = 0x003C001F;
70pub const BASIC_PUBLISH: u32 = 0x003C0028;
71pub const BASIC_RETURN: u32 = 0x003C0032;
72pub const BASIC_DELIVER: u32 = 0x003C003C;
73pub const BASIC_GET: u32 = 0x003C0046;
74pub const BASIC_GET_OK: u32 = 0x003C0047;
75pub const BASIC_GET_EMPTY: u32 = 0x003C0048;
76pub const BASIC_ACK: u32 = 0x003C0050;
77pub const BASIC_REJECT: u32 = 0x003C005A;
78pub const BASIC_RECOVER_ASYNC: u32 = 0x003C0064;
79pub const BASIC_RECOVER: u32 = 0x003C006E;
80pub const BASIC_RECOVER_OK: u32 = 0x003C006F;
81
82pub const CONFIRM_SELECT: u32 = 0x0055000A;
83pub const CONFIRM_SELECT_OK: u32 = 0x0055000B;
84
85pub type Channel = u16;
86pub type ClassMethod = u32;
87pub type ClassId = u16;
88pub type Weight = u16;
89
90/// Represents an AMQP frame.
91pub enum AMQPFrame {
92    /// Header is to be sent to the server at first, announcing the AMQP version we support
93    Header,
94    /// Represents the AMQP RPC frames. Connection based calls have a channel number 0, otherwise
95    /// channel is the current channel on which the frames are sent. The RPC arguments are
96    /// represented in `MethodFrameArgs`.
97    Method(Channel, ClassMethod, MethodFrameArgs),
98    ContentHeader(ContentHeaderFrame),
99    ContentBody(ContentBodyFrame),
100    Heartbeat(Channel),
101}
102
103impl std::fmt::Debug for AMQPFrame {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        match self {
106            AMQPFrame::Header => write!(f, "Header"),
107            AMQPFrame::Method(ch, cm, args) => write!(f, "Method(channel={}, {:08X}, {:?})", ch, cm, args),
108            AMQPFrame::ContentHeader(ch) => write!(f, "ContentHeader({:?})", ch),
109            AMQPFrame::ContentBody(cb) => write!(f, "ContentBody({:?})", cb),
110            AMQPFrame::Heartbeat(_) => write!(f, "Heartbeat"),
111        }
112    }
113}
114
115/// Represents all types of method frame arguments.
116#[derive(Debug)]
117pub enum MethodFrameArgs {
118    ConnectionStart(ConnectionStartArgs),
119    ConnectionStartOk(ConnectionStartOkArgs),
120    ConnectionTune(ConnectionTuneArgs),
121    ConnectionTuneOk(ConnectionTuneOkArgs),
122    ConnectionOpen(ConnectionOpenArgs),
123    ConnectionOpenOk,
124    ConnectionClose(ConnectionCloseArgs),
125    ConnectionCloseOk,
126    ChannelOpen,
127    ChannelOpenOk,
128    ChannelClose(ChannelCloseArgs),
129    ChannelCloseOk,
130    ExchangeDeclare(ExchangeDeclareArgs),
131    ExchangeDeclareOk,
132    ExchangeDelete(ExchangeDeleteArgs),
133    ExchangeDeleteOk,
134    QueueDeclare(QueueDeclareArgs),
135    QueueDeclareOk(QueueDeclareOkArgs),
136    QueueBind(QueueBindArgs),
137    QueueBindOk,
138    QueuePurge(QueuePurgeArgs),
139    QueuePurgeOk(QueuePurgeOkArgs),
140    QueueDelete(QueueDeleteArgs),
141    QueueDeleteOk(QueueDeleteOkArgs),
142    QueueUnbind(QueueUnbindArgs),
143    QueueUnbindOk,
144    BasicConsume(BasicConsumeArgs),
145    BasicConsumeOk(BasicConsumeOkArgs),
146    BasicCancel(BasicCancelArgs),
147    BasicCancelOk(BasicCancelOkArgs),
148    BasicGet(BasicGetArgs),
149    BasicGetOk(BasicGetOkArgs),
150    BasicGetEmpty,
151    BasicPublish(BasicPublishArgs),
152    BasicReturn(BasicReturnArgs),
153    BasicDeliver(BasicDeliverArgs),
154    BasicAck(BasicAckArgs),
155    BasicReject(BasicRejectArgs),
156    ConfirmSelect(ConfirmSelectArgs),
157    ConfirmSelectOk,
158}
159
160bitflags! {
161    pub struct HeaderPropertyFlags: u16 {
162        const CLUSTER_ID       = 0b0000_0000_0000_0100;
163        const APP_ID           = 0b0000_0000_0000_1000;
164        const USER_ID          = 0b0000_0000_0001_0000;
165        const MESSAGE_TYPE     = 0b0000_0000_0010_0000;
166        const TIMESTAMP        = 0b0000_0000_0100_0000;
167        const MESSAGE_ID       = 0b0000_0000_1000_0000;
168        const EXPIRATION       = 0b0000_0001_0000_0000;
169        const REPLY_TO         = 0b0000_0010_0000_0000;
170        const CORRELATION_ID   = 0b0000_0100_0000_0000;
171        const PRIORITY         = 0b0000_1000_0000_0000;
172        const DELIVERY_MODE    = 0b0001_0000_0000_0000;
173        const HEADERS          = 0b0010_0000_0000_0000;
174        const CONTENT_ENCODING = 0b0100_0000_0000_0000;
175        const CONTENT_TYPE     = 0b1000_0000_0000_0000;
176    }
177}
178
179impl Default for HeaderPropertyFlags {
180    fn default() -> Self {
181        HeaderPropertyFlags::empty()
182    }
183}
184
185// FIXME The property flags is 16 bits but a message can contain more than 16 properties.
186// If the last bit of the flags is 0, there are more properties sent. I cannot see how, I need to
187// see a good example.
188#[derive(Debug, Default)]
189pub struct ContentHeaderFrame {
190    pub channel: Channel,
191    pub class_id: ClassId,
192    pub weight: Weight,
193    pub body_size: u64,
194    pub prop_flags: HeaderPropertyFlags,
195    pub cluster_id: Option<String>,
196    pub app_id: Option<String>,
197    pub user_id: Option<String>,
198    pub message_type: Option<String>,
199    pub timestamp: Option<u64>,
200    pub message_id: Option<String>,
201    pub expiration: Option<String>,
202    pub reply_to: Option<String>,
203    pub correlation_id: Option<String>,
204    pub priority: Option<u8>,
205    pub delivery_mode: Option<u8>,
206    pub headers: Option<FieldTable>,
207    pub content_encoding: Option<String>,
208    pub content_type: Option<String>,
209}
210
211impl ContentHeaderFrame {
212    pub fn with_content_type(&mut self, content_type: String) -> &ContentHeaderFrame {
213        self.content_type = Some(content_type);
214        self.prop_flags.set(HeaderPropertyFlags::CONTENT_TYPE, true);
215        self
216    }
217
218    pub fn frame(self) -> AMQPFrame {
219        AMQPFrame::ContentHeader(self)
220    }
221}
222
223pub struct ContentBodyFrame {
224    pub channel: Channel,
225    // TODO here we need something which can be cloned cheap like Box or Rc. Sometimes we can move
226    // out this from the struct and we can build a new struct. But we need to avoid the
227    // byte-to-byte copy.
228    pub body: Vec<u8>,
229}
230
231impl ContentBodyFrame {
232    pub fn frame(self) -> AMQPFrame {
233        AMQPFrame::ContentBody(self)
234    }
235}
236
237impl std::fmt::Debug for ContentBodyFrame {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        let body = String::from_utf8_lossy(&self.body[..std::cmp::min(64usize, self.body.len())]);
240
241        f.write_fmt(format_args!(
242            "ContentBodyFrame {{ channel: {}, body: \"{}\" }}",
243            &self.channel, body
244        ))
245    }
246}
247
248/// Type alias for inner type of field value.
249pub type FieldTable = HashMap<String, AMQPFieldValue>;
250
251#[derive(Debug)]
252pub enum AMQPValue {
253    //    Bool(bool),
254    U8(u8),
255    U16(u16),
256    U32(u32),
257    U64(u64),
258    SimpleString(String),
259    LongString(String),
260    EmptyFieldTable,
261    FieldTable(Box<FieldTable>),
262}
263
264#[derive(Clone, Debug, PartialEq)]
265pub enum AMQPFieldValue {
266    Bool(bool),
267    //    SimpleString(String),
268    LongString(String),
269    EmptyFieldTable,
270    FieldTable(Box<FieldTable>),
271}
272
273impl From<ContentHeaderFrame> for AMQPFrame {
274    fn from(chf: ContentHeaderFrame) -> AMQPFrame {
275        AMQPFrame::ContentHeader(chf)
276    }
277}
278
279impl From<ContentBodyFrame> for AMQPFrame {
280    fn from(cbf: ContentBodyFrame) -> AMQPFrame {
281        AMQPFrame::ContentBody(cbf)
282    }
283}
284
285/// Split class id and method id from `u32` combined code.
286pub fn split_class_method(cm: u32) -> (u16, u16) {
287    let method_id = (cm & 0x0000FFFF) as u16;
288    let class_id = (cm >> 16) as u16;
289
290    (class_id, method_id)
291}
292
293/// Combine class id and method id to a single `u32` value
294pub fn unify_class_method(class: u16, method: u16) -> u32 {
295    ((class as u32) << 16) | (method as u32)
296}
297
298pub fn heartbeat() -> AMQPFrame {
299    AMQPFrame::Heartbeat(0)
300}