1use std::collections::HashMap;
2
3pub const CONNECTION_START: u32 = 0x000A000A;
4pub const CONNECTION_START_OK: u32 = 0x000A000B;
5pub const CONNECTION_TUNE: u32 = 0x000A001E;
6pub const CONNECTION_TUNE_OK: u32 = 0x000A001F;
7pub const CONNECTION_OPEN: u32 = 0x000A0028;
8pub const CONNECTION_OPEN_OK: u32 = 0x000A0029;
9pub const CONNECTION_CLOSE: u32 = 0x000A0032;
10pub const CONNECTION_CLOSE_OK: u32 = 0x000A0033;
11
12pub const CHANNEL_OPEN: u32 = 0x0014000A;
13pub const CHANNEL_OPEN_OK: u32 = 0x0014000B;
14pub const CHANNEL_CLOSE: u32 = 0x00140028;
15pub const CHANNEL_CLOSE_OK: u32 = 0x00140029;
16
17pub const EXCHANGE_DECLARE: u32 = 0x0028000A;
18pub const EXCHANGE_DECLARE_OK: u32 = 0x0028000B;
19
20pub const QUEUE_DECLARE: u32 = 0x0032000A;
21pub const QUEUE_DECLARE_OK: u32 = 0x0032000B;
22pub const QUEUE_BIND: u32 = 0x00320014;
23pub const QUEUE_BIND_OK: u32 = 0x00320015;
24
25pub const BASIC_CONSUME: u32 = 0x003C0014;
26pub const BASIC_CONSUME_OK: u32 = 0x003C0015;
27pub const BASIC_PUBLISH: u32 = 0x003C0028;
28pub const BASIC_DELIVER: u32 = 0x003C003C;
29
30pub type Channel = u16;
31pub type ClassMethod = u32;
32pub type ClassId = u16;
33pub type Weight = u16;
34
35#[derive(Debug)]
37pub enum AMQPFrame {
38    Header,
39    Method(Channel, ClassMethod, MethodFrameArgs),
40    ContentHeader(ContentHeaderFrame),
41    ContentBody(ContentBodyFrame),
42    Heartbeat(Channel),
43}
44
45#[derive(Debug)]
47pub enum MethodFrameArgs {
48    ConnectionStart(ConnectionStartArgs),
49    ConnectionStartOk(ConnectionStartOkArgs),
50    ConnectionTune(ConnectionTuneArgs),
51    ConnectionTuneOk(ConnectionTuneOkArgs),
52    ConnectionOpen(ConnectionOpenArgs),
53    ConnectionOpenOk,
54    ConnectionClose(ConnectionCloseArgs),
55    ConnectionCloseOk,
56    ChannelOpen,
57    ChannelOpenOk,
58    ChannelClose(ChannelCloseArgs),
59    ChannelCloseOk,
60    ExchangeDeclare(ExchangeDeclareArgs),
61    ExchangeDeclareOk,
62    ExchangeBind(ExchangeBindArgs),
63    ExchangeBindOk,
64    QueueDeclare(QueueDeclareArgs),
65    QueueDeclareOk(QueueDeclareOkArgs),
66    QueueBind(QueueBindArgs),
67    QueueBindOk,
68    BasicConsume(BasicConsumeArgs),
69    BasicConsumeOk(BasicConsumeOkArgs),
70    BasicDeliver(BasicDeliverArgs),
71    BasicPublish(BasicPublishArgs)
72}
73
74#[derive(Debug)]
75pub struct ContentHeaderFrame {
76    pub channel: Channel,
77    pub class_id: ClassId,
78    pub weight: Weight,
79    pub body_size: u64,
80    pub prop_flags: u16, pub args: Vec<AMQPValue>,
82}
83
84#[derive(Debug)]
85pub struct ContentBodyFrame {
86    pub channel: Channel,
87    pub body: Vec<u8>,
88}
89
90pub type FieldTable = HashMap<String, AMQPFieldValue>;
92
93#[derive(Debug)]
94pub enum AMQPValue {
95    U8(u8),
97    U16(u16),
98    U32(u32),
99    U64(u64),
100    SimpleString(String),
101    LongString(String),
102    EmptyFieldTable,
103    FieldTable(Box<FieldTable>),
104}
105
106#[derive(Debug)]
107pub enum AMQPFieldValue {
108    Bool(bool),
109    LongString(String),
111    EmptyFieldTable,
112    FieldTable(Box<FieldTable>),
113}
114
115#[derive(Debug, Default)]
116pub struct ConnectionStartArgs {
117    pub version_major: u8,
118    pub version_minor: u8,
119    pub capabilities: Option<FieldTable>,
120    pub properties: Option<FieldTable>,
121    pub mechanisms: String,
122    pub locales: String,
123}
124
125#[derive(Debug, Default)]
126pub struct ConnectionStartOkArgs {
127    pub capabilities: Option<FieldTable>,
128    pub properties: Option<FieldTable>,
129    pub mechanism: String,
130    pub response: String,
131    pub locale: String,
132}
133
134#[derive(Debug, Default)]
135pub struct ConnectionTuneArgs {
136    pub channel_max: u16,
137    pub frame_max: u32,
138    pub heartbeat: u16,
139}
140
141#[derive(Debug, Default)]
142pub struct ConnectionTuneOkArgs {
143    pub channel_max: u16,
144    pub frame_max: u32,
145    pub heartbeat: u16,
146}
147
148#[derive(Debug, Default)]
149pub struct ConnectionOpenArgs {
150    pub virtual_host: String,
151    pub insist: bool,
152}
153
154#[derive(Debug, Default)]
155pub struct ConnectionCloseArgs {
156    pub code: u16,
157    pub text: String,
158    pub class_id: u16,
159    pub method_id: u16,
160}
161
162#[derive(Debug, Default)]
163pub struct ChannelCloseArgs {
164    pub code: u16,
165    pub text: String,
166    pub class_id: u16,
167    pub method_id: u16,
168}
169
170bitflags! {
171    pub struct ExchangeDeclareFlags: u8 {
172        const PASSIVE = 0b00000001;
173        const DURABLE = 0b00000010;
174        const AUTO_DELETE = 0b00000100;
175        const INTERNAL = 0b00001000;
176        const NO_WAIT = 0b00010000;
177    }
178}
179
180impl Default for ExchangeDeclareFlags {
181    fn default() -> Self {
182        ExchangeDeclareFlags::empty()
183    }
184}
185
186#[derive(Debug, Default)]
187pub struct ExchangeDeclareArgs {
188    pub exchange_name: String,
189    pub exchange_type: String,
190    pub flags: ExchangeDeclareFlags,
191    pub args: Option<FieldTable>,
192}
193
194#[derive(Debug, Default)]
195pub struct ExchangeBindArgs {
196    pub source: String,
197    pub destination: String,
198    pub routing_key: String,
199    pub no_wait: bool,
200    pub args: Option<FieldTable>,
201}
202
203bitflags! {
204    pub struct QueueDeclareFlags: u8 {
205        const PASSIVE = 0b00000001;
206        const DURABLE = 0b00000010;
207        const EXCLUSIVE = 0b00000100;
208        const AUTO_DELETE = 0b00001000;
209        const NO_WAIT = 0b00010000;
210    }
211}
212
213impl Default for QueueDeclareFlags {
214    fn default() -> Self {
215        QueueDeclareFlags::empty()
216    }
217}
218
219#[derive(Debug, Default)]
220pub struct QueueDeclareArgs {
221    pub name: String,
222    pub flags: QueueDeclareFlags,
223    pub args: Option<FieldTable>,
224}
225
226#[derive(Debug, Default)]
227pub struct QueueDeclareOkArgs {
228    pub name: String,
229    pub message_count: u32,
230    pub consumer_count: u32,
231}
232
233#[derive(Debug, Default)]
234pub struct QueueBindArgs {
235    pub queue_name: String,
236    pub exchange_name: String,
237    pub routing_key: String,
238    pub no_wait: bool,
239    pub args: Option<FieldTable>,
240}
241
242bitflags! {
243    pub struct BasicConsumeFlags: u8 {
244        const NO_LOCAL = 0b00000001;
245        const NO_ACK = 0b00000010;
246        const EXCLUSIVE = 0b00000100;
247        const NO_WAIT = 0b00001000;
248    }
249}
250
251impl Default for BasicConsumeFlags {
252    fn default() -> Self {
253        BasicConsumeFlags::NO_ACK
254    }
255}
256
257#[derive(Debug, Default)]
258pub struct BasicConsumeArgs {
259    pub queue: String,
260    pub consumer_tag: String,
261    pub flags: BasicConsumeFlags,
262    pub args: Option<FieldTable>,
263}
264
265#[derive(Debug, Default)]
266pub struct BasicConsumeOkArgs {
267    pub consumer_tag: String,
268}
269
270#[derive(Debug, Default)]
271pub struct BasicDeliverArgs {
272    pub consumer_tag: String,
273    pub delivery_tag: u64,
274    pub redelivered: bool,
275    pub exchange_name: String,
276    pub routing_key: String,
277}
278
279bitflags! {
280    pub struct BasicPublishFlags: u8 {
281        const MANDATORY = 0b00000001;
282        const IMMEDIATE = 0b00000010;
283    }
284}
285
286impl Default for BasicPublishFlags {
287    fn default() -> Self {
288        BasicPublishFlags::empty()
289    }
290}
291
292#[derive(Debug, Default)]
293pub struct BasicPublishArgs {
294    pub exchange_name: String,
295    pub routing_key: String,
296    pub flags: BasicPublishFlags
297}
298
299impl From<ContentHeaderFrame> for AMQPFrame {
300    fn from(chf: ContentHeaderFrame) -> AMQPFrame {
301        AMQPFrame::ContentHeader(chf)
302    }
303}
304
305impl From<ContentBodyFrame> for AMQPFrame {
306    fn from(cbf: ContentBodyFrame) -> AMQPFrame {
307        AMQPFrame::ContentBody(cbf)
308    }
309}
310
311pub fn split_class_method(cm: u32) -> (u16, u16) {
313    let method_id = (cm & 0x0000FFFF) as u16;
314    let class_id = (cm >> 16) as u16;
315
316    (class_id, method_id)
317}
318
319pub fn connection_start(channel: u16) -> AMQPFrame {
320    let mut capabilities = FieldTable::new();
321
322    capabilities.insert("publisher_confirms".into(), AMQPFieldValue::Bool(true));
323    capabilities.insert(
324        "exchange_exchange_bindings".into(),
325        AMQPFieldValue::Bool(true),
326    );
327    capabilities.insert("basic.nack".into(), AMQPFieldValue::Bool(true));
328    capabilities.insert("consumer_cancel_notify".into(), AMQPFieldValue::Bool(true));
329    capabilities.insert("connection.blocked".into(), AMQPFieldValue::Bool(true));
330    capabilities.insert("consumer_priorities".into(), AMQPFieldValue::Bool(true));
331    capabilities.insert(
332        "authentication_failure_close".into(),
333        AMQPFieldValue::Bool(true),
334    );
335    capabilities.insert("per_consumer_qos".into(), AMQPFieldValue::Bool(true));
336    capabilities.insert("direct_reply_to".into(), AMQPFieldValue::Bool(true));
337
338    let mut server_properties = FieldTable::new();
339
340    server_properties.insert(
341        "capabilities".into(),
342        AMQPFieldValue::FieldTable(Box::new(capabilities)),
343    );
344    server_properties.insert(
345        "product".into(),
346        AMQPFieldValue::LongString("IronMQ server".into()),
347    );
348    server_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
349
350    AMQPFrame::Method(
351        channel,
352        CONNECTION_START,
353        MethodFrameArgs::ConnectionStart(ConnectionStartArgs {
354            version_major: 0,
355            version_minor: 9,
356            capabilities: None,
357            properties: Some(server_properties),
358            mechanisms: "PLAIN".into(),
359            locales: "en_US".into()
360        }))
361}
362
363pub fn connection_start_ok(username: &str, password: &str, capabilities: FieldTable) -> AMQPFrame {
365    let mut client_properties = FieldTable::new();
366
367    client_properties.insert(
368        "product".into(),
369        AMQPFieldValue::LongString("ironmq-client".into()),
370    );
371    client_properties.insert("platform".into(), AMQPFieldValue::LongString("Rust".into()));
372    client_properties.insert(
373        "capabilities".into(),
374        AMQPFieldValue::FieldTable(Box::new(capabilities)),
375    );
376    client_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
378
379    let mut auth = Vec::<u8>::new();
380    auth.push(0x00);
381    auth.extend_from_slice(username.as_bytes());
382    auth.push(0x00);
383    auth.extend_from_slice(password.as_bytes());
384
385    let auth_string = String::from_utf8(auth).unwrap();
386
387    AMQPFrame::Method(
388        0,
389        CONNECTION_START_OK,
390        MethodFrameArgs::ConnectionStartOk(ConnectionStartOkArgs {
391            capabilities: None,
392            properties: Some(client_properties),
393            mechanism: "PLAIN".into(),
394            response: auth_string,
395            locale: "en_US".into()
396        }))
397}
398
399pub fn connection_tune(channel: u16) -> AMQPFrame {
400    AMQPFrame::Method(
401        channel,
402        CONNECTION_TUNE,
403        MethodFrameArgs::ConnectionTune(ConnectionTuneArgs {
404            channel_max: 2047,
405            frame_max: 131_072,
406            heartbeat: 60
407        }))
408}
409
410pub fn connection_tune_ok(channel: u16) -> AMQPFrame {
411    AMQPFrame::Method(
412        channel,
413        CONNECTION_TUNE_OK,
414        MethodFrameArgs::ConnectionTuneOk(ConnectionTuneOkArgs {
415            channel_max: 2047,
416            frame_max: 131_072,
417            heartbeat: 60
418        }))
419}
420
421pub fn connection_open(channel: u16, virtual_host: String) -> AMQPFrame {
422    AMQPFrame::Method(
423        channel,
424        CONNECTION_OPEN,
425        MethodFrameArgs::ConnectionOpen(ConnectionOpenArgs {
426            virtual_host: virtual_host,
427            insist: true
428        }))
429}
430
431pub fn connection_open_ok(channel: u16) -> AMQPFrame {
432    AMQPFrame::Method(
433        channel,
434        CONNECTION_OPEN_OK,
435        MethodFrameArgs::ConnectionOpenOk
436    )
437}
438
439pub fn connection_close(channel: u16) -> AMQPFrame {
440    AMQPFrame::Method(
441        channel,
442        CONNECTION_CLOSE,
443        MethodFrameArgs::ConnectionClose(ConnectionCloseArgs {
444            code: 200,
445            text: "Normal shutdown".into(),
446            class_id: 0,
447            method_id: 0
448        }))
449}
450
451pub fn connection_close_ok(channel: u16) -> AMQPFrame {
452    AMQPFrame::Method(
453        channel,
454        CONNECTION_CLOSE_OK,
455        MethodFrameArgs::ConnectionCloseOk
456        )
457}
458
459pub fn channel_open(channel: u16) -> AMQPFrame {
460    AMQPFrame::Method(
461        channel,
462        CHANNEL_OPEN,
463        MethodFrameArgs::ChannelOpen
464    )
465}
466
467pub fn channel_open_ok(channel: u16) -> AMQPFrame {
468    AMQPFrame::Method(
469        channel,
470        CHANNEL_OPEN_OK,
471        MethodFrameArgs::ChannelOpenOk
472    )
473}
474
475pub fn channel_close(channel: Channel, code: u16, text: &str, class_id: u16, method_id: u16) -> AMQPFrame {
476    AMQPFrame::Method(
477        channel,
478        CHANNEL_CLOSE,
479        MethodFrameArgs::ChannelClose(ChannelCloseArgs {
480            code: code,
481            text: text.into(),
482            class_id: class_id,
483            method_id: method_id
484        }))
485}
486
487pub fn channel_close_ok(channel: Channel) -> AMQPFrame {
488    AMQPFrame::Method(channel, CHANNEL_CLOSE_OK, MethodFrameArgs::ChannelCloseOk)
489}
490
491pub fn exchange_declare(channel: u16, exchange_name: String, exchange_type: String) -> AMQPFrame {
492    AMQPFrame::Method(
493        channel,
494        EXCHANGE_DECLARE,
495        MethodFrameArgs::ExchangeDeclare(ExchangeDeclareArgs {
496            exchange_name: exchange_name,
497            exchange_type: exchange_type,
498            flags: ExchangeDeclareFlags::empty(),
499            args: None
500        }))
501}
502
503pub fn exchange_declare_ok(channel: u16) -> AMQPFrame {
504    AMQPFrame::Method(
505        channel,
506        EXCHANGE_DECLARE_OK,
507        MethodFrameArgs::ExchangeDeclareOk
508    )
509}
510
511pub fn queue_bind(
512    channel: u16,
513    queue_name: String,
514    exchange_name: String,
515    routing_key: String,
516) -> AMQPFrame {
517    AMQPFrame::Method(
518        channel,
519        QUEUE_BIND,
520        MethodFrameArgs::QueueBind(QueueBindArgs {
521            queue_name: queue_name,
522            exchange_name: exchange_name,
523            routing_key: routing_key,
524            no_wait: false,
525            args: None
526        }))
527}
528
529pub fn queue_bind_ok(channel: u16) -> AMQPFrame {
530    AMQPFrame::Method(
531        channel,
532        QUEUE_BIND_OK,
533        MethodFrameArgs::QueueBindOk
534    )
535}
536
537pub fn queue_declare(channel: u16, queue_name: String) -> AMQPFrame {
538    AMQPFrame::Method(
539        channel,
540        QUEUE_DECLARE,
541        MethodFrameArgs::QueueDeclare(QueueDeclareArgs {
542            name: queue_name,
543            flags: QueueDeclareFlags::empty(),
544            args: None
545        }))
546}
547
548pub fn queue_declare_ok(
549    channel: u16,
550    queue_name: String,
551    message_count: u32,
552    consumer_count: u32,
553) -> AMQPFrame {
554    AMQPFrame::Method(
555        channel,
556        QUEUE_DECLARE_OK,
557        MethodFrameArgs::QueueDeclareOk(QueueDeclareOkArgs {
558            name: queue_name,
559            message_count: message_count,
560            consumer_count: consumer_count
561        }))
562}
563
564pub fn basic_consume(channel: u16, queue_name: String, consumer_tag: String) -> AMQPFrame {
565    AMQPFrame::Method(
566        channel,
567        BASIC_CONSUME,
568        MethodFrameArgs::BasicConsume(BasicConsumeArgs {
569            queue: queue_name,
570            consumer_tag: consumer_tag,
571            flags: BasicConsumeFlags::default(),
572            args: None
573        }))
574}
575
576pub fn basic_consume_ok(channel: u16, consumer_tag: String) -> AMQPFrame {
577    AMQPFrame::Method(
578        channel,
579        BASIC_CONSUME_OK,
580        MethodFrameArgs::BasicConsumeOk(BasicConsumeOkArgs {
581            consumer_tag: consumer_tag
582        }))
583}
584
585pub fn basic_deliver(
586    channel: u16,
587    consumer_tag: String,
588    delivery_tag: u64,
589    redelivered: bool,
590    exchange_name: String,
591    routing_key: String,
592) -> AMQPFrame {
593    AMQPFrame::Method(
594        channel,
595        BASIC_DELIVER,
596        MethodFrameArgs::BasicDeliver(BasicDeliverArgs {
597            consumer_tag: consumer_tag,
598            delivery_tag: delivery_tag,
599            redelivered: redelivered,
600            exchange_name: exchange_name,
601            routing_key: routing_key
602        })
603    )
604}
605
606pub fn basic_publish(channel: u16, exchange_name: String, routing_key: String) -> AMQPFrame {
607    AMQPFrame::Method(
608        channel,
609        BASIC_PUBLISH,
610        MethodFrameArgs::BasicPublish(BasicPublishArgs {
611            exchange_name: exchange_name,
612            routing_key: routing_key,
613            flags: BasicPublishFlags::empty()
614        })
615    )
616}
617
618pub fn content_header(channel: u16, size: u64) -> ContentHeaderFrame {
619    ContentHeaderFrame {
620        channel: channel,
621        class_id: 0x003C,
622        weight: 0,
623        body_size: size,
624        prop_flags: 0x0000,
625        args: vec![],
626    }
627}
628
629pub fn content_body(channel: u16, payload: &[u8]) -> ContentBodyFrame {
630    ContentBodyFrame {
631        channel: channel,
632        body: payload.to_vec(),
633    }
634}