1mod basic;
2mod channel;
3mod connection;
4mod exchange;
5mod queue;
6
7pub use self::{
8 basic::{
9 basic_get_empty, confirm_select, confirm_select_ok, BasicAckArgs, BasicCancelArgs, BasicCancelOkArgs,
10 BasicConsumeArgs, BasicConsumeFlags, BasicConsumeOkArgs, BasicDeliverArgs, BasicGetArgs, BasicGetOkArgs,
11 BasicPublishArgs, BasicPublishFlags, BasicRejectArgs, BasicReturnArgs, ConfirmSelectArgs,
12 },
13 channel::{channel_close, channel_close_ok, channel_open, channel_open_ok, ChannelCloseArgs},
14 connection::{
15 connection_close, connection_close_ok, connection_open_ok, connection_tune, connection_tune_ok,
16 ConnectionCloseArgs, ConnectionOpenArgs, ConnectionStartArgs, ConnectionStartOkArgs, ConnectionTuneArgs,
17 ConnectionTuneOkArgs,
18 },
19 exchange::{
20 exchange_declare_ok, exchange_delete_ok, ExchangeDeclareArgs, ExchangeDeclareFlags, ExchangeDeleteArgs,
21 ExchangeDeleteFlags,
22 },
23 queue::{
24 queue_bind_ok, QueueBindArgs, QueueDeclareArgs, QueueDeclareFlags, QueueDeclareOkArgs, QueueDeleteArgs,
25 QueueDeleteFlags, QueueDeleteOkArgs, QueuePurgeArgs, QueuePurgeOkArgs, QueueUnbindArgs,
26 },
27};
28use std::collections::HashMap;
29
30pub const CONNECTION_START: u32 = 0x000A000A;
31pub const CONNECTION_START_OK: u32 = 0x000A000B;
32pub const CONNECTION_SECRET: u32 = 0x000A0014;
33pub const CONNECTION_SECRET_OK: u32 = 0x000A0015;
34pub const CONNECTION_TUNE: u32 = 0x000A001E;
35pub const CONNECTION_TUNE_OK: u32 = 0x000A001F;
36pub const CONNECTION_OPEN: u32 = 0x000A0028;
37pub const CONNECTION_OPEN_OK: u32 = 0x000A0029;
38pub const CONNECTION_CLOSE: u32 = 0x000A0032;
39pub const CONNECTION_CLOSE_OK: u32 = 0x000A0033;
40
41pub const CHANNEL_OPEN: u32 = 0x0014000A;
42pub const CHANNEL_OPEN_OK: u32 = 0x0014000B;
43pub const CHANNEL_FLOW: u32 = 0x00140014;
44pub const CHANNEL_FLOW_OK: u32 = 0x00140015;
45pub const CHANNEL_CLOSE: u32 = 0x00140028;
46pub const CHANNEL_CLOSE_OK: u32 = 0x00140029;
47
48pub const EXCHANGE_DECLARE: u32 = 0x0028000A;
49pub const EXCHANGE_DECLARE_OK: u32 = 0x0028000B;
50pub const EXCHANGE_DELETE: u32 = 0x00280014;
51pub const EXCHANGE_DELETE_OK: u32 = 0x00280015;
52
53pub const QUEUE_DECLARE: u32 = 0x0032000A;
54pub const QUEUE_DECLARE_OK: u32 = 0x0032000B;
55pub const QUEUE_BIND: u32 = 0x00320014;
56pub const QUEUE_BIND_OK: u32 = 0x00320015;
57pub const QUEUE_PURGE: u32 = 0x0032001E;
58pub const QUEUE_PURGE_OK: u32 = 0x0032001F;
59pub const QUEUE_DELETE: u32 = 0x00320028;
60pub const QUEUE_DELETE_OK: u32 = 0x00320029;
61pub const QUEUE_UNBIND: u32 = 0x00320032;
62pub const QUEUE_UNBIND_OK: u32 = 0x00320033;
63
64pub const BASIC_QOS: u32 = 0x003C000A;
65pub const BASIC_QOS_OK: u32 = 0x003C000B;
66pub const BASIC_CONSUME: u32 = 0x003C0014;
67pub const BASIC_CONSUME_OK: u32 = 0x003C0015;
68pub const BASIC_CANCEL: u32 = 0x003C001E;
69pub const BASIC_CANCEL_OK: u32 = 0x003C001F;
70pub const BASIC_PUBLISH: u32 = 0x003C0028;
71pub const BASIC_RETURN: u32 = 0x003C0032;
72pub const BASIC_DELIVER: u32 = 0x003C003C;
73pub const BASIC_GET: u32 = 0x003C0046;
74pub const BASIC_GET_OK: u32 = 0x003C0047;
75pub const BASIC_GET_EMPTY: u32 = 0x003C0048;
76pub const BASIC_ACK: u32 = 0x003C0050;
77pub const BASIC_REJECT: u32 = 0x003C005A;
78pub const BASIC_RECOVER_ASYNC: u32 = 0x003C0064;
79pub const BASIC_RECOVER: u32 = 0x003C006E;
80pub const BASIC_RECOVER_OK: u32 = 0x003C006F;
81
82pub const CONFIRM_SELECT: u32 = 0x0055000A;
83pub const CONFIRM_SELECT_OK: u32 = 0x0055000B;
84
85pub type Channel = u16;
86pub type ClassMethod = u32;
87pub type ClassId = u16;
88pub type Weight = u16;
89
90pub enum AMQPFrame {
92 Header,
94 Method(Channel, ClassMethod, MethodFrameArgs),
98 ContentHeader(ContentHeaderFrame),
99 ContentBody(ContentBodyFrame),
100 Heartbeat(Channel),
101}
102
103impl std::fmt::Debug for AMQPFrame {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 match self {
106 AMQPFrame::Header => write!(f, "Header"),
107 AMQPFrame::Method(ch, cm, args) => write!(f, "Method(channel={}, {:08X}, {:?})", ch, cm, args),
108 AMQPFrame::ContentHeader(ch) => write!(f, "ContentHeader({:?})", ch),
109 AMQPFrame::ContentBody(cb) => write!(f, "ContentBody({:?})", cb),
110 AMQPFrame::Heartbeat(_) => write!(f, "Heartbeat"),
111 }
112 }
113}
114
115#[derive(Debug)]
117pub enum MethodFrameArgs {
118 ConnectionStart(ConnectionStartArgs),
119 ConnectionStartOk(ConnectionStartOkArgs),
120 ConnectionTune(ConnectionTuneArgs),
121 ConnectionTuneOk(ConnectionTuneOkArgs),
122 ConnectionOpen(ConnectionOpenArgs),
123 ConnectionOpenOk,
124 ConnectionClose(ConnectionCloseArgs),
125 ConnectionCloseOk,
126 ChannelOpen,
127 ChannelOpenOk,
128 ChannelClose(ChannelCloseArgs),
129 ChannelCloseOk,
130 ExchangeDeclare(ExchangeDeclareArgs),
131 ExchangeDeclareOk,
132 ExchangeDelete(ExchangeDeleteArgs),
133 ExchangeDeleteOk,
134 QueueDeclare(QueueDeclareArgs),
135 QueueDeclareOk(QueueDeclareOkArgs),
136 QueueBind(QueueBindArgs),
137 QueueBindOk,
138 QueuePurge(QueuePurgeArgs),
139 QueuePurgeOk(QueuePurgeOkArgs),
140 QueueDelete(QueueDeleteArgs),
141 QueueDeleteOk(QueueDeleteOkArgs),
142 QueueUnbind(QueueUnbindArgs),
143 QueueUnbindOk,
144 BasicConsume(BasicConsumeArgs),
145 BasicConsumeOk(BasicConsumeOkArgs),
146 BasicCancel(BasicCancelArgs),
147 BasicCancelOk(BasicCancelOkArgs),
148 BasicGet(BasicGetArgs),
149 BasicGetOk(BasicGetOkArgs),
150 BasicGetEmpty,
151 BasicPublish(BasicPublishArgs),
152 BasicReturn(BasicReturnArgs),
153 BasicDeliver(BasicDeliverArgs),
154 BasicAck(BasicAckArgs),
155 BasicReject(BasicRejectArgs),
156 ConfirmSelect(ConfirmSelectArgs),
157 ConfirmSelectOk,
158}
159
160bitflags! {
161 pub struct HeaderPropertyFlags: u16 {
162 const CLUSTER_ID = 0b0000_0000_0000_0100;
163 const APP_ID = 0b0000_0000_0000_1000;
164 const USER_ID = 0b0000_0000_0001_0000;
165 const MESSAGE_TYPE = 0b0000_0000_0010_0000;
166 const TIMESTAMP = 0b0000_0000_0100_0000;
167 const MESSAGE_ID = 0b0000_0000_1000_0000;
168 const EXPIRATION = 0b0000_0001_0000_0000;
169 const REPLY_TO = 0b0000_0010_0000_0000;
170 const CORRELATION_ID = 0b0000_0100_0000_0000;
171 const PRIORITY = 0b0000_1000_0000_0000;
172 const DELIVERY_MODE = 0b0001_0000_0000_0000;
173 const HEADERS = 0b0010_0000_0000_0000;
174 const CONTENT_ENCODING = 0b0100_0000_0000_0000;
175 const CONTENT_TYPE = 0b1000_0000_0000_0000;
176 }
177}
178
179impl Default for HeaderPropertyFlags {
180 fn default() -> Self {
181 HeaderPropertyFlags::empty()
182 }
183}
184
185#[derive(Debug, Default)]
189pub struct ContentHeaderFrame {
190 pub channel: Channel,
191 pub class_id: ClassId,
192 pub weight: Weight,
193 pub body_size: u64,
194 pub prop_flags: HeaderPropertyFlags,
195 pub cluster_id: Option<String>,
196 pub app_id: Option<String>,
197 pub user_id: Option<String>,
198 pub message_type: Option<String>,
199 pub timestamp: Option<u64>,
200 pub message_id: Option<String>,
201 pub expiration: Option<String>,
202 pub reply_to: Option<String>,
203 pub correlation_id: Option<String>,
204 pub priority: Option<u8>,
205 pub delivery_mode: Option<u8>,
206 pub headers: Option<FieldTable>,
207 pub content_encoding: Option<String>,
208 pub content_type: Option<String>,
209}
210
211impl ContentHeaderFrame {
212 pub fn with_content_type(&mut self, content_type: String) -> &ContentHeaderFrame {
213 self.content_type = Some(content_type);
214 self.prop_flags.set(HeaderPropertyFlags::CONTENT_TYPE, true);
215 self
216 }
217
218 pub fn frame(self) -> AMQPFrame {
219 AMQPFrame::ContentHeader(self)
220 }
221}
222
223pub struct ContentBodyFrame {
224 pub channel: Channel,
225 pub body: Vec<u8>,
229}
230
231impl ContentBodyFrame {
232 pub fn frame(self) -> AMQPFrame {
233 AMQPFrame::ContentBody(self)
234 }
235}
236
237impl std::fmt::Debug for ContentBodyFrame {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 let body = String::from_utf8_lossy(&self.body[..std::cmp::min(64usize, self.body.len())]);
240
241 f.write_fmt(format_args!(
242 "ContentBodyFrame {{ channel: {}, body: \"{}\" }}",
243 &self.channel, body
244 ))
245 }
246}
247
248pub type FieldTable = HashMap<String, AMQPFieldValue>;
250
251#[derive(Debug)]
252pub enum AMQPValue {
253 U8(u8),
255 U16(u16),
256 U32(u32),
257 U64(u64),
258 SimpleString(String),
259 LongString(String),
260 EmptyFieldTable,
261 FieldTable(Box<FieldTable>),
262}
263
264#[derive(Clone, Debug, PartialEq)]
265pub enum AMQPFieldValue {
266 Bool(bool),
267 LongString(String),
269 EmptyFieldTable,
270 FieldTable(Box<FieldTable>),
271}
272
273impl From<ContentHeaderFrame> for AMQPFrame {
274 fn from(chf: ContentHeaderFrame) -> AMQPFrame {
275 AMQPFrame::ContentHeader(chf)
276 }
277}
278
279impl From<ContentBodyFrame> for AMQPFrame {
280 fn from(cbf: ContentBodyFrame) -> AMQPFrame {
281 AMQPFrame::ContentBody(cbf)
282 }
283}
284
285pub fn split_class_method(cm: u32) -> (u16, u16) {
287 let method_id = (cm & 0x0000FFFF) as u16;
288 let class_id = (cm >> 16) as u16;
289
290 (class_id, method_id)
291}
292
293pub fn unify_class_method(class: u16, method: u16) -> u32 {
295 ((class as u32) << 16) | (method as u32)
296}
297
298pub fn heartbeat() -> AMQPFrame {
299 AMQPFrame::Heartbeat(0)
300}