1use std::collections::HashMap;
2
3pub const CONNECTION_START: u32 = 0x000A000A;
4pub const CONNECTION_START_OK: u32 = 0x000A000B;
5pub const CONNECTION_TUNE: u32 = 0x000A001E;
6pub const CONNECTION_TUNE_OK: u32 = 0x000A001F;
7pub const CONNECTION_OPEN: u32 = 0x000A0028;
8pub const CONNECTION_OPEN_OK: u32 = 0x000A0029;
9pub const CONNECTION_CLOSE: u32 = 0x000A0032;
10pub const CONNECTION_CLOSE_OK: u32 = 0x000A0033;
11
12pub const CHANNEL_OPEN: u32 = 0x0014000A;
13pub const CHANNEL_OPEN_OK: u32 = 0x0014000B;
14pub const CHANNEL_CLOSE: u32 = 0x00140028;
15pub const CHANNEL_CLOSE_OK: u32 = 0x00140029;
16
17pub const EXCHANGE_DECLARE: u32 = 0x0028000A;
18pub const EXCHANGE_DECLARE_OK: u32 = 0x0028000B;
19
20pub const QUEUE_DECLARE: u32 = 0x0032000A;
21pub const QUEUE_DECLARE_OK: u32 = 0x0032000B;
22pub const QUEUE_BIND: u32 = 0x00320014;
23pub const QUEUE_BIND_OK: u32 = 0x00320015;
24
25pub const BASIC_CONSUME: u32 = 0x003C0014;
26pub const BASIC_CONSUME_OK: u32 = 0x003C0015;
27pub const BASIC_PUBLISH: u32 = 0x003C0028;
28pub const BASIC_DELIVER: u32 = 0x003C003C;
29
30pub type Channel = u16;
31pub type ClassMethod = u32;
32pub type ClassId = u16;
33pub type Weight = u16;
34
35#[derive(Debug)]
37pub enum AMQPFrame {
38 Header,
39 Method(Channel, ClassMethod, MethodFrameArgs),
40 ContentHeader(ContentHeaderFrame),
41 ContentBody(ContentBodyFrame),
42 Heartbeat(Channel),
43}
44
45#[derive(Debug)]
47pub enum MethodFrameArgs {
48 ConnectionStart(ConnectionStartArgs),
49 ConnectionStartOk(ConnectionStartOkArgs),
50 ConnectionTune(ConnectionTuneArgs),
51 ConnectionTuneOk(ConnectionTuneOkArgs),
52 ConnectionOpen(ConnectionOpenArgs),
53 ConnectionOpenOk,
54 ConnectionClose(ConnectionCloseArgs),
55 ConnectionCloseOk,
56 ChannelOpen,
57 ChannelOpenOk,
58 ChannelClose(ChannelCloseArgs),
59 ChannelCloseOk,
60 ExchangeDeclare(ExchangeDeclareArgs),
61 ExchangeDeclareOk,
62 ExchangeBind(ExchangeBindArgs),
63 ExchangeBindOk,
64 QueueDeclare(QueueDeclareArgs),
65 QueueDeclareOk(QueueDeclareOkArgs),
66 QueueBind(QueueBindArgs),
67 QueueBindOk,
68 BasicConsume(BasicConsumeArgs),
69 BasicConsumeOk(BasicConsumeOkArgs),
70 BasicDeliver(BasicDeliverArgs),
71 BasicPublish(BasicPublishArgs)
72}
73
74#[derive(Debug)]
75pub struct ContentHeaderFrame {
76 pub channel: Channel,
77 pub class_id: ClassId,
78 pub weight: Weight,
79 pub body_size: u64,
80 pub prop_flags: u16, pub args: Vec<AMQPValue>,
82}
83
84#[derive(Debug)]
85pub struct ContentBodyFrame {
86 pub channel: Channel,
87 pub body: Vec<u8>,
88}
89
90pub type FieldTable = HashMap<String, AMQPFieldValue>;
92
93#[derive(Debug)]
94pub enum AMQPValue {
95 U8(u8),
97 U16(u16),
98 U32(u32),
99 U64(u64),
100 SimpleString(String),
101 LongString(String),
102 EmptyFieldTable,
103 FieldTable(Box<FieldTable>),
104}
105
106#[derive(Debug)]
107pub enum AMQPFieldValue {
108 Bool(bool),
109 LongString(String),
111 EmptyFieldTable,
112 FieldTable(Box<FieldTable>),
113}
114
115#[derive(Debug, Default)]
116pub struct ConnectionStartArgs {
117 pub version_major: u8,
118 pub version_minor: u8,
119 pub capabilities: Option<FieldTable>,
120 pub properties: Option<FieldTable>,
121 pub mechanisms: String,
122 pub locales: String,
123}
124
125#[derive(Debug, Default)]
126pub struct ConnectionStartOkArgs {
127 pub capabilities: Option<FieldTable>,
128 pub properties: Option<FieldTable>,
129 pub mechanism: String,
130 pub response: String,
131 pub locale: String,
132}
133
134#[derive(Debug, Default)]
135pub struct ConnectionTuneArgs {
136 pub channel_max: u16,
137 pub frame_max: u32,
138 pub heartbeat: u16,
139}
140
141#[derive(Debug, Default)]
142pub struct ConnectionTuneOkArgs {
143 pub channel_max: u16,
144 pub frame_max: u32,
145 pub heartbeat: u16,
146}
147
148#[derive(Debug, Default)]
149pub struct ConnectionOpenArgs {
150 pub virtual_host: String,
151 pub insist: bool,
152}
153
154#[derive(Debug, Default)]
155pub struct ConnectionCloseArgs {
156 pub code: u16,
157 pub text: String,
158 pub class_id: u16,
159 pub method_id: u16,
160}
161
162#[derive(Debug, Default)]
163pub struct ChannelCloseArgs {
164 pub code: u16,
165 pub text: String,
166 pub class_id: u16,
167 pub method_id: u16,
168}
169
170bitflags! {
171 pub struct ExchangeDeclareFlags: u8 {
172 const PASSIVE = 0b00000001;
173 const DURABLE = 0b00000010;
174 const AUTO_DELETE = 0b00000100;
175 const INTERNAL = 0b00001000;
176 const NO_WAIT = 0b00010000;
177 }
178}
179
180impl Default for ExchangeDeclareFlags {
181 fn default() -> Self {
182 ExchangeDeclareFlags::empty()
183 }
184}
185
186#[derive(Debug, Default)]
187pub struct ExchangeDeclareArgs {
188 pub exchange_name: String,
189 pub exchange_type: String,
190 pub flags: ExchangeDeclareFlags,
191 pub args: Option<FieldTable>,
192}
193
194#[derive(Debug, Default)]
195pub struct ExchangeBindArgs {
196 pub source: String,
197 pub destination: String,
198 pub routing_key: String,
199 pub no_wait: bool,
200 pub args: Option<FieldTable>,
201}
202
203bitflags! {
204 pub struct QueueDeclareFlags: u8 {
205 const PASSIVE = 0b00000001;
206 const DURABLE = 0b00000010;
207 const EXCLUSIVE = 0b00000100;
208 const AUTO_DELETE = 0b00001000;
209 const NO_WAIT = 0b00010000;
210 }
211}
212
213impl Default for QueueDeclareFlags {
214 fn default() -> Self {
215 QueueDeclareFlags::empty()
216 }
217}
218
219#[derive(Debug, Default)]
220pub struct QueueDeclareArgs {
221 pub name: String,
222 pub flags: QueueDeclareFlags,
223 pub args: Option<FieldTable>,
224}
225
226#[derive(Debug, Default)]
227pub struct QueueDeclareOkArgs {
228 pub name: String,
229 pub message_count: u32,
230 pub consumer_count: u32,
231}
232
233#[derive(Debug, Default)]
234pub struct QueueBindArgs {
235 pub queue_name: String,
236 pub exchange_name: String,
237 pub routing_key: String,
238 pub no_wait: bool,
239 pub args: Option<FieldTable>,
240}
241
242bitflags! {
243 pub struct BasicConsumeFlags: u8 {
244 const NO_LOCAL = 0b00000001;
245 const NO_ACK = 0b00000010;
246 const EXCLUSIVE = 0b00000100;
247 const NO_WAIT = 0b00001000;
248 }
249}
250
251impl Default for BasicConsumeFlags {
252 fn default() -> Self {
253 BasicConsumeFlags::NO_ACK
254 }
255}
256
257#[derive(Debug, Default)]
258pub struct BasicConsumeArgs {
259 pub queue: String,
260 pub consumer_tag: String,
261 pub flags: BasicConsumeFlags,
262 pub args: Option<FieldTable>,
263}
264
265#[derive(Debug, Default)]
266pub struct BasicConsumeOkArgs {
267 pub consumer_tag: String,
268}
269
270#[derive(Debug, Default)]
271pub struct BasicDeliverArgs {
272 pub consumer_tag: String,
273 pub delivery_tag: u64,
274 pub redelivered: bool,
275 pub exchange_name: String,
276 pub routing_key: String,
277}
278
279bitflags! {
280 pub struct BasicPublishFlags: u8 {
281 const MANDATORY = 0b00000001;
282 const IMMEDIATE = 0b00000010;
283 }
284}
285
286impl Default for BasicPublishFlags {
287 fn default() -> Self {
288 BasicPublishFlags::empty()
289 }
290}
291
292#[derive(Debug, Default)]
293pub struct BasicPublishArgs {
294 pub exchange_name: String,
295 pub routing_key: String,
296 pub flags: BasicPublishFlags
297}
298
299impl From<ContentHeaderFrame> for AMQPFrame {
300 fn from(chf: ContentHeaderFrame) -> AMQPFrame {
301 AMQPFrame::ContentHeader(chf)
302 }
303}
304
305impl From<ContentBodyFrame> for AMQPFrame {
306 fn from(cbf: ContentBodyFrame) -> AMQPFrame {
307 AMQPFrame::ContentBody(cbf)
308 }
309}
310
311pub fn split_class_method(cm: u32) -> (u16, u16) {
313 let method_id = (cm & 0x0000FFFF) as u16;
314 let class_id = (cm >> 16) as u16;
315
316 (class_id, method_id)
317}
318
319pub fn connection_start(channel: u16) -> AMQPFrame {
320 let mut capabilities = FieldTable::new();
321
322 capabilities.insert("publisher_confirms".into(), AMQPFieldValue::Bool(true));
323 capabilities.insert(
324 "exchange_exchange_bindings".into(),
325 AMQPFieldValue::Bool(true),
326 );
327 capabilities.insert("basic.nack".into(), AMQPFieldValue::Bool(true));
328 capabilities.insert("consumer_cancel_notify".into(), AMQPFieldValue::Bool(true));
329 capabilities.insert("connection.blocked".into(), AMQPFieldValue::Bool(true));
330 capabilities.insert("consumer_priorities".into(), AMQPFieldValue::Bool(true));
331 capabilities.insert(
332 "authentication_failure_close".into(),
333 AMQPFieldValue::Bool(true),
334 );
335 capabilities.insert("per_consumer_qos".into(), AMQPFieldValue::Bool(true));
336 capabilities.insert("direct_reply_to".into(), AMQPFieldValue::Bool(true));
337
338 let mut server_properties = FieldTable::new();
339
340 server_properties.insert(
341 "capabilities".into(),
342 AMQPFieldValue::FieldTable(Box::new(capabilities)),
343 );
344 server_properties.insert(
345 "product".into(),
346 AMQPFieldValue::LongString("IronMQ server".into()),
347 );
348 server_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
349
350 AMQPFrame::Method(
351 channel,
352 CONNECTION_START,
353 MethodFrameArgs::ConnectionStart(ConnectionStartArgs {
354 version_major: 0,
355 version_minor: 9,
356 capabilities: None,
357 properties: Some(server_properties),
358 mechanisms: "PLAIN".into(),
359 locales: "en_US".into()
360 }))
361}
362
363pub fn connection_start_ok(username: &str, password: &str, capabilities: FieldTable) -> AMQPFrame {
365 let mut client_properties = FieldTable::new();
366
367 client_properties.insert(
368 "product".into(),
369 AMQPFieldValue::LongString("ironmq-client".into()),
370 );
371 client_properties.insert("platform".into(), AMQPFieldValue::LongString("Rust".into()));
372 client_properties.insert(
373 "capabilities".into(),
374 AMQPFieldValue::FieldTable(Box::new(capabilities)),
375 );
376 client_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
378
379 let mut auth = Vec::<u8>::new();
380 auth.push(0x00);
381 auth.extend_from_slice(username.as_bytes());
382 auth.push(0x00);
383 auth.extend_from_slice(password.as_bytes());
384
385 let auth_string = String::from_utf8(auth).unwrap();
386
387 AMQPFrame::Method(
388 0,
389 CONNECTION_START_OK,
390 MethodFrameArgs::ConnectionStartOk(ConnectionStartOkArgs {
391 capabilities: None,
392 properties: Some(client_properties),
393 mechanism: "PLAIN".into(),
394 response: auth_string,
395 locale: "en_US".into()
396 }))
397}
398
399pub fn connection_tune(channel: u16) -> AMQPFrame {
400 AMQPFrame::Method(
401 channel,
402 CONNECTION_TUNE,
403 MethodFrameArgs::ConnectionTune(ConnectionTuneArgs {
404 channel_max: 2047,
405 frame_max: 131_072,
406 heartbeat: 60
407 }))
408}
409
410pub fn connection_tune_ok(channel: u16) -> AMQPFrame {
411 AMQPFrame::Method(
412 channel,
413 CONNECTION_TUNE_OK,
414 MethodFrameArgs::ConnectionTuneOk(ConnectionTuneOkArgs {
415 channel_max: 2047,
416 frame_max: 131_072,
417 heartbeat: 60
418 }))
419}
420
421pub fn connection_open(channel: u16, virtual_host: String) -> AMQPFrame {
422 AMQPFrame::Method(
423 channel,
424 CONNECTION_OPEN,
425 MethodFrameArgs::ConnectionOpen(ConnectionOpenArgs {
426 virtual_host: virtual_host,
427 insist: true
428 }))
429}
430
431pub fn connection_open_ok(channel: u16) -> AMQPFrame {
432 AMQPFrame::Method(
433 channel,
434 CONNECTION_OPEN_OK,
435 MethodFrameArgs::ConnectionOpenOk
436 )
437}
438
439pub fn connection_close(channel: u16) -> AMQPFrame {
440 AMQPFrame::Method(
441 channel,
442 CONNECTION_CLOSE,
443 MethodFrameArgs::ConnectionClose(ConnectionCloseArgs {
444 code: 200,
445 text: "Normal shutdown".into(),
446 class_id: 0,
447 method_id: 0
448 }))
449}
450
451pub fn connection_close_ok(channel: u16) -> AMQPFrame {
452 AMQPFrame::Method(
453 channel,
454 CONNECTION_CLOSE_OK,
455 MethodFrameArgs::ConnectionCloseOk
456 )
457}
458
459pub fn channel_open(channel: u16) -> AMQPFrame {
460 AMQPFrame::Method(
461 channel,
462 CHANNEL_OPEN,
463 MethodFrameArgs::ChannelOpen
464 )
465}
466
467pub fn channel_open_ok(channel: u16) -> AMQPFrame {
468 AMQPFrame::Method(
469 channel,
470 CHANNEL_OPEN_OK,
471 MethodFrameArgs::ChannelOpenOk
472 )
473}
474
475pub fn channel_close(channel: Channel, code: u16, text: &str, class_id: u16, method_id: u16) -> AMQPFrame {
476 AMQPFrame::Method(
477 channel,
478 CHANNEL_CLOSE,
479 MethodFrameArgs::ChannelClose(ChannelCloseArgs {
480 code: code,
481 text: text.into(),
482 class_id: class_id,
483 method_id: method_id
484 }))
485}
486
487pub fn channel_close_ok(channel: Channel) -> AMQPFrame {
488 AMQPFrame::Method(channel, CHANNEL_CLOSE_OK, MethodFrameArgs::ChannelCloseOk)
489}
490
491pub fn exchange_declare(channel: u16, exchange_name: String, exchange_type: String) -> AMQPFrame {
492 AMQPFrame::Method(
493 channel,
494 EXCHANGE_DECLARE,
495 MethodFrameArgs::ExchangeDeclare(ExchangeDeclareArgs {
496 exchange_name: exchange_name,
497 exchange_type: exchange_type,
498 flags: ExchangeDeclareFlags::empty(),
499 args: None
500 }))
501}
502
503pub fn exchange_declare_ok(channel: u16) -> AMQPFrame {
504 AMQPFrame::Method(
505 channel,
506 EXCHANGE_DECLARE_OK,
507 MethodFrameArgs::ExchangeDeclareOk
508 )
509}
510
511pub fn queue_bind(
512 channel: u16,
513 queue_name: String,
514 exchange_name: String,
515 routing_key: String,
516) -> AMQPFrame {
517 AMQPFrame::Method(
518 channel,
519 QUEUE_BIND,
520 MethodFrameArgs::QueueBind(QueueBindArgs {
521 queue_name: queue_name,
522 exchange_name: exchange_name,
523 routing_key: routing_key,
524 no_wait: false,
525 args: None
526 }))
527}
528
529pub fn queue_bind_ok(channel: u16) -> AMQPFrame {
530 AMQPFrame::Method(
531 channel,
532 QUEUE_BIND_OK,
533 MethodFrameArgs::QueueBindOk
534 )
535}
536
537pub fn queue_declare(channel: u16, queue_name: String) -> AMQPFrame {
538 AMQPFrame::Method(
539 channel,
540 QUEUE_DECLARE,
541 MethodFrameArgs::QueueDeclare(QueueDeclareArgs {
542 name: queue_name,
543 flags: QueueDeclareFlags::empty(),
544 args: None
545 }))
546}
547
548pub fn queue_declare_ok(
549 channel: u16,
550 queue_name: String,
551 message_count: u32,
552 consumer_count: u32,
553) -> AMQPFrame {
554 AMQPFrame::Method(
555 channel,
556 QUEUE_DECLARE_OK,
557 MethodFrameArgs::QueueDeclareOk(QueueDeclareOkArgs {
558 name: queue_name,
559 message_count: message_count,
560 consumer_count: consumer_count
561 }))
562}
563
564pub fn basic_consume(channel: u16, queue_name: String, consumer_tag: String) -> AMQPFrame {
565 AMQPFrame::Method(
566 channel,
567 BASIC_CONSUME,
568 MethodFrameArgs::BasicConsume(BasicConsumeArgs {
569 queue: queue_name,
570 consumer_tag: consumer_tag,
571 flags: BasicConsumeFlags::default(),
572 args: None
573 }))
574}
575
576pub fn basic_consume_ok(channel: u16, consumer_tag: String) -> AMQPFrame {
577 AMQPFrame::Method(
578 channel,
579 BASIC_CONSUME_OK,
580 MethodFrameArgs::BasicConsumeOk(BasicConsumeOkArgs {
581 consumer_tag: consumer_tag
582 }))
583}
584
585pub fn basic_deliver(
586 channel: u16,
587 consumer_tag: String,
588 delivery_tag: u64,
589 redelivered: bool,
590 exchange_name: String,
591 routing_key: String,
592) -> AMQPFrame {
593 AMQPFrame::Method(
594 channel,
595 BASIC_DELIVER,
596 MethodFrameArgs::BasicDeliver(BasicDeliverArgs {
597 consumer_tag: consumer_tag,
598 delivery_tag: delivery_tag,
599 redelivered: redelivered,
600 exchange_name: exchange_name,
601 routing_key: routing_key
602 })
603 )
604}
605
606pub fn basic_publish(channel: u16, exchange_name: String, routing_key: String) -> AMQPFrame {
607 AMQPFrame::Method(
608 channel,
609 BASIC_PUBLISH,
610 MethodFrameArgs::BasicPublish(BasicPublishArgs {
611 exchange_name: exchange_name,
612 routing_key: routing_key,
613 flags: BasicPublishFlags::empty()
614 })
615 )
616}
617
618pub fn content_header(channel: u16, size: u64) -> ContentHeaderFrame {
619 ContentHeaderFrame {
620 channel: channel,
621 class_id: 0x003C,
622 weight: 0,
623 body_size: size,
624 prop_flags: 0x0000,
625 args: vec![],
626 }
627}
628
629pub fn content_body(channel: u16, payload: &[u8]) -> ContentBodyFrame {
630 ContentBodyFrame {
631 channel: channel,
632 body: payload.to_vec(),
633 }
634}