metalmq_codec/
codec.rs

1use crate::frame::*;
2use bytes::{Buf, BufMut, BytesMut};
3use std::collections::HashMap;
4use tokio_util::codec::{Decoder, Encoder};
5
6const FRAME_METHOD_FRAME: u8 = 0x01;
7const FRAME_CONTENT_HEADER: u8 = 0x02;
8const FRAME_CONTENT_BODY: u8 = 0x03;
9const FRAME_HEARTBEAT: u8 = 0x08;
10const FRAME_AMQP_VERSION: u8 = 0x41;
11
12/// Placeholder for AMQP encoder and decoder functions.
13pub struct AMQPCodec {}
14
15#[allow(clippy::large_enum_variant)]
16#[derive(Debug)]
17pub enum Frame {
18    Frame(AMQPFrame),
19    Frames(Vec<AMQPFrame>),
20}
21
22impl Encoder<Frame> for AMQPCodec {
23    type Error = std::io::Error;
24
25    fn encode(&mut self, event: Frame, buf: &mut BytesMut) -> Result<(), Self::Error> {
26        match event {
27            Frame::Frame(frame) => encode_amqp_frame(buf, frame),
28            Frame::Frames(frames) => {
29                for frame in frames {
30                    encode_amqp_frame(buf, frame);
31                }
32            }
33        }
34
35        Ok(())
36    }
37}
38
39impl Decoder for AMQPCodec {
40    type Item = Frame;
41    type Error = std::io::Error;
42
43    // TODO here we can decode more frames until the buffer contains data
44    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
45        if src.len() < 7 || !is_full_frame(src) {
46            return Ok(None);
47        }
48
49        match src.get_u8() {
50            FRAME_METHOD_FRAME => {
51                let channel = src.get_u16();
52                // TODO amqp frame can be u32 but Buf handles only usize buffes
53                let frame_len = src.get_u32() as usize;
54
55                // TODO here there is a panic is the frame is not long enough!
56                // FIXME rewrite it to assert instead
57                if src.len() < frame_len + 1 {
58                    unreachable!();
59                }
60                let mut frame_buf = src.split_to(frame_len);
61                let frame = decode_method_frame(&mut frame_buf, channel);
62
63                let _frame_separator = src.get_u8();
64
65                Ok(Some(Frame::Frame(frame)))
66            }
67            FRAME_CONTENT_HEADER => {
68                let channel = src.get_u16();
69                let frame_len = src.get_u32() as usize;
70
71                let mut frame_buf = src.split_to(frame_len);
72                let frame = decode_content_header_frame(&mut frame_buf, channel);
73
74                let _frame_separator = src.get_u8();
75
76                Ok(Some(Frame::Frame(frame)))
77            }
78            FRAME_CONTENT_BODY => {
79                let channel = src.get_u16();
80                let body_len = src.get_u32();
81                let bytes = src.split_to(body_len as usize);
82
83                let _frame_separator = src.get_u8();
84
85                // TODO more effective copy
86                let frame = AMQPFrame::ContentBody(ContentBodyFrame {
87                    channel,
88                    body: bytes.to_vec(),
89                });
90
91                Ok(Some(Frame::Frame(frame)))
92            }
93            FRAME_HEARTBEAT => {
94                let channel = src.get_u16();
95                let len = src.get_u32();
96                let _ = src.split_to(len as usize);
97
98                let _frame_separator = src.get_u8();
99
100                Ok(Some(Frame::Frame(AMQPFrame::Heartbeat(channel))))
101            }
102            FRAME_AMQP_VERSION => {
103                let mut head = [0u8; 7];
104                src.copy_to_slice(&mut head);
105
106                // TODO check if version is 0091
107
108                Ok(Some(Frame::Frame(AMQPFrame::Header)))
109            }
110            f => Err(std::io::Error::new(
111                std::io::ErrorKind::Other,
112                format!("Unknown frame {}", f),
113            )),
114        }
115    }
116}
117
118/// Check if the buffer contains the full frame. We can do that easily since
119/// most of the time the frame contains the length information.
120fn is_full_frame(src: &BytesMut) -> bool {
121    match src[0] {
122        FRAME_AMQP_VERSION => src.len() >= 8,
123        _ => {
124            let mut bs = [0u8; 4];
125            bs.copy_from_slice(&src[3..7]);
126
127            let len = u32::from_be_bytes(bs) as usize;
128
129            src.len() >= len + 8
130        }
131    }
132}
133
134// TODO have an Error type here, and it should be result<>
135fn decode_method_frame(src: &mut BytesMut, channel: u16) -> AMQPFrame {
136    let class_method = src.get_u32();
137
138    let method_frame_args = match class_method {
139        CONNECTION_START => decode_connection_start(src),
140        CONNECTION_START_OK => decode_connection_start_ok(src),
141        CONNECTION_TUNE => decode_connection_tune(src),
142        CONNECTION_TUNE_OK => decode_connection_tune_ok(src),
143        CONNECTION_OPEN => decode_connection_open(src),
144        CONNECTION_OPEN_OK => decode_connection_open_ok(src),
145        CONNECTION_CLOSE => decode_connection_close(src),
146        CONNECTION_CLOSE_OK => MethodFrameArgs::ConnectionCloseOk,
147        CHANNEL_OPEN => decode_channel_open(src),
148        CHANNEL_OPEN_OK => decode_channel_open_ok(src),
149        CHANNEL_CLOSE => decode_channel_close(src),
150        CHANNEL_CLOSE_OK => MethodFrameArgs::ChannelCloseOk,
151        EXCHANGE_DECLARE => decode_exchange_declare(src),
152        EXCHANGE_DECLARE_OK => MethodFrameArgs::ExchangeDeclareOk,
153        EXCHANGE_DELETE => decode_exchange_delete(src),
154        EXCHANGE_DELETE_OK => MethodFrameArgs::ExchangeDeleteOk,
155        QUEUE_DECLARE => decode_queue_declare(src),
156        QUEUE_DECLARE_OK => decode_queue_declare_ok(src),
157        QUEUE_BIND => decode_queue_bind(src),
158        QUEUE_BIND_OK => MethodFrameArgs::QueueBindOk,
159        QUEUE_PURGE => decode_queue_purge(src),
160        QUEUE_PURGE_OK => decode_queue_purge_ok(src),
161        QUEUE_DELETE => decode_queue_delete(src),
162        QUEUE_DELETE_OK => decode_queue_delete_ok(src),
163        QUEUE_UNBIND => decode_queue_unbind(src),
164        QUEUE_UNBIND_OK => MethodFrameArgs::QueueUnbindOk,
165        BASIC_CONSUME => decode_basic_consume(src),
166        BASIC_CONSUME_OK => decode_basic_consume_ok(src),
167        BASIC_CANCEL => decode_basic_cancel(src),
168        BASIC_CANCEL_OK => decode_basic_cancel_ok(src),
169        BASIC_GET => decode_basic_get(src),
170        BASIC_GET_OK => decode_basic_get_ok(src),
171        BASIC_GET_EMPTY => decode_basic_get_empty(src),
172        BASIC_PUBLISH => decode_basic_publish(src),
173        BASIC_RETURN => decode_basic_return(src),
174        BASIC_DELIVER => decode_basic_deliver(src),
175        BASIC_ACK => decode_basic_ack(src),
176        BASIC_REJECT => decode_basic_reject(src),
177        CONFIRM_SELECT => decode_confirm_select(src),
178        CONFIRM_SELECT_OK => MethodFrameArgs::ConfirmSelectOk,
179        _ => unimplemented!("{:08X}", class_method),
180    };
181
182    AMQPFrame::Method(channel, class_method, method_frame_args)
183}
184
185fn decode_connection_start(src: &mut BytesMut) -> MethodFrameArgs {
186    let args = ConnectionStartArgs {
187        version_major: src.get_u8(),
188        version_minor: src.get_u8(),
189        properties: decode_field_table(src),
190        mechanisms: decode_long_string(src),
191        locales: decode_long_string(src),
192        ..Default::default()
193    };
194
195    //if let Some(ref table) = args.properties {
196    //    if let Some(AMQPFieldValue::FieldTable(cap)) = table.get("capabilities".into()) {
197    //        args.capabilities = Some(**cap.clone());
198    //    }
199    //}
200
201    MethodFrameArgs::ConnectionStart(args)
202}
203
204fn decode_connection_start_ok(src: &mut BytesMut) -> MethodFrameArgs {
205    let args = ConnectionStartOkArgs {
206        properties: decode_field_table(src),
207        mechanism: decode_short_string(src),
208        response: decode_long_string(src),
209        locale: decode_short_string(src),
210        ..Default::default()
211    };
212
213    // TODO init capabilities!
214
215    MethodFrameArgs::ConnectionStartOk(args)
216}
217
218fn decode_connection_tune(src: &mut BytesMut) -> MethodFrameArgs {
219    let args = ConnectionTuneArgs {
220        channel_max: src.get_u16(),
221        frame_max: src.get_u32(),
222        heartbeat: src.get_u16(),
223    };
224
225    MethodFrameArgs::ConnectionTune(args)
226}
227
228fn decode_connection_tune_ok(src: &mut BytesMut) -> MethodFrameArgs {
229    let args = ConnectionTuneOkArgs {
230        channel_max: src.get_u16(),
231        frame_max: src.get_u32(),
232        heartbeat: src.get_u16(),
233    };
234
235    MethodFrameArgs::ConnectionTuneOk(args)
236}
237
238fn decode_connection_open(src: &mut BytesMut) -> MethodFrameArgs {
239    let virtual_host = decode_short_string(src);
240    let _reserved = decode_short_string(src);
241    let flags = src.get_u8();
242
243    MethodFrameArgs::ConnectionOpen(ConnectionOpenArgs {
244        virtual_host,
245        insist: flags & 0x01 != 0,
246    })
247}
248
249fn decode_connection_open_ok(src: &mut BytesMut) -> MethodFrameArgs {
250    let _ = decode_short_string(src);
251
252    MethodFrameArgs::ConnectionOpenOk
253}
254
255fn decode_connection_close(src: &mut BytesMut) -> MethodFrameArgs {
256    let args = ConnectionCloseArgs {
257        code: src.get_u16(),
258        text: decode_short_string(src),
259        class_id: src.get_u16(),
260        method_id: src.get_u16(),
261    };
262
263    MethodFrameArgs::ConnectionClose(args)
264}
265
266fn decode_channel_open(src: &mut BytesMut) -> MethodFrameArgs {
267    let _ = decode_short_string(src);
268
269    MethodFrameArgs::ChannelOpen
270}
271
272fn decode_channel_open_ok(src: &mut BytesMut) -> MethodFrameArgs {
273    let _ = decode_long_string(src);
274
275    MethodFrameArgs::ChannelOpenOk
276}
277
278fn decode_channel_close(src: &mut BytesMut) -> MethodFrameArgs {
279    let args = ChannelCloseArgs {
280        code: src.get_u16(),
281        text: decode_short_string(src),
282        class_id: src.get_u16(),
283        method_id: src.get_u16(),
284    };
285
286    MethodFrameArgs::ChannelClose(args)
287}
288
289fn decode_exchange_declare(src: &mut BytesMut) -> MethodFrameArgs {
290    let mut args = ExchangeDeclareArgs::default();
291    let _ = src.get_u16();
292    args.exchange_name = decode_short_string(src);
293    args.exchange_type = decode_short_string(src);
294    args.flags = ExchangeDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
295    args.args = decode_field_table(src);
296
297    MethodFrameArgs::ExchangeDeclare(args)
298}
299
300fn decode_exchange_delete(src: &mut BytesMut) -> MethodFrameArgs {
301    let mut args = ExchangeDeleteArgs::default();
302    let _ = src.get_u16();
303    args.exchange_name = decode_short_string(src);
304    args.flags = ExchangeDeleteFlags::from_bits(src.get_u8()).unwrap_or_default();
305
306    MethodFrameArgs::ExchangeDelete(args)
307}
308
309fn decode_queue_declare(src: &mut BytesMut) -> MethodFrameArgs {
310    let mut args = QueueDeclareArgs::default();
311    let _ = src.get_u16();
312    args.name = decode_short_string(src);
313    args.flags = QueueDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
314    args.args = decode_field_table(src);
315
316    MethodFrameArgs::QueueDeclare(args)
317}
318
319fn decode_queue_declare_ok(src: &mut BytesMut) -> MethodFrameArgs {
320    let args = QueueDeclareOkArgs {
321        name: decode_short_string(src),
322        message_count: src.get_u32(),
323        consumer_count: src.get_u32(),
324    };
325
326    MethodFrameArgs::QueueDeclareOk(args)
327}
328
329fn decode_queue_bind(src: &mut BytesMut) -> MethodFrameArgs {
330    let mut args = QueueBindArgs::default();
331    let _ = src.get_u16();
332    args.queue_name = decode_short_string(src);
333    args.exchange_name = decode_short_string(src);
334    args.routing_key = decode_short_string(src);
335
336    args.no_wait = src.get_u8() != 0;
337    args.args = decode_field_table(src);
338
339    MethodFrameArgs::QueueBind(args)
340}
341
342fn decode_queue_purge(src: &mut BytesMut) -> MethodFrameArgs {
343    let mut args = QueuePurgeArgs::default();
344    let _ = src.get_u16();
345    args.queue_name = decode_short_string(src);
346    args.no_wait = src.get_u8() != 0;
347
348    MethodFrameArgs::QueuePurge(args)
349}
350
351fn decode_queue_purge_ok(src: &mut BytesMut) -> MethodFrameArgs {
352    let mut args = QueuePurgeOkArgs::default();
353    args.message_count = src.get_u32();
354
355    MethodFrameArgs::QueuePurgeOk(args)
356}
357
358fn decode_queue_delete(src: &mut BytesMut) -> MethodFrameArgs {
359    let mut args = QueueDeleteArgs::default();
360    let _ = src.get_u16();
361    args.queue_name = decode_short_string(src);
362    args.flags = QueueDeleteFlags::from_bits(src.get_u8()).unwrap_or_default();
363
364    MethodFrameArgs::QueueDelete(args)
365}
366
367fn decode_queue_delete_ok(src: &mut BytesMut) -> MethodFrameArgs {
368    let args = QueueDeleteOkArgs {
369        message_count: src.get_u32(),
370    };
371
372    MethodFrameArgs::QueueDeleteOk(args)
373}
374
375fn decode_queue_unbind(src: &mut BytesMut) -> MethodFrameArgs {
376    let mut args = QueueUnbindArgs::default();
377    let _ = src.get_u16();
378    args.queue_name = decode_short_string(src);
379    args.exchange_name = decode_short_string(src);
380    args.routing_key = decode_short_string(src);
381    args.args = decode_field_table(src);
382
383    MethodFrameArgs::QueueUnbind(args)
384}
385
386fn decode_basic_consume(src: &mut BytesMut) -> MethodFrameArgs {
387    let mut args = BasicConsumeArgs::default();
388    let _ = src.get_u16();
389    args.queue = decode_short_string(src);
390    args.consumer_tag = decode_short_string(src);
391    args.flags = BasicConsumeFlags::from_bits(src.get_u8()).unwrap_or_default();
392    args.args = decode_field_table(src);
393
394    MethodFrameArgs::BasicConsume(args)
395}
396
397fn decode_basic_consume_ok(src: &mut BytesMut) -> MethodFrameArgs {
398    let args = BasicConsumeOkArgs {
399        consumer_tag: decode_short_string(src),
400    };
401
402    MethodFrameArgs::BasicConsumeOk(args)
403}
404
405fn decode_basic_cancel(src: &mut BytesMut) -> MethodFrameArgs {
406    let args = BasicCancelArgs {
407        consumer_tag: decode_short_string(src),
408        no_wait: src.get_u8() != 0,
409    };
410
411    MethodFrameArgs::BasicCancel(args)
412}
413
414fn decode_basic_cancel_ok(src: &mut BytesMut) -> MethodFrameArgs {
415    let args = BasicCancelOkArgs {
416        consumer_tag: decode_short_string(src),
417    };
418
419    MethodFrameArgs::BasicCancelOk(args)
420}
421
422fn decode_basic_get(src: &mut BytesMut) -> MethodFrameArgs {
423    let mut args = BasicGetArgs::default();
424    let _ = src.get_u16();
425
426    args.queue = decode_short_string(src);
427    args.no_ack = src.get_u8() != 0;
428
429    MethodFrameArgs::BasicGet(args)
430}
431
432fn decode_basic_get_ok(src: &mut BytesMut) -> MethodFrameArgs {
433    let mut args = BasicGetOkArgs::default();
434    args.delivery_tag = src.get_u64();
435    args.redelivered = src.get_u8() != 0;
436    args.exchange_name = decode_short_string(src);
437    args.routing_key = decode_short_string(src);
438    args.message_count = src.get_u32();
439
440    MethodFrameArgs::BasicGetOk(args)
441}
442
443fn decode_basic_get_empty(src: &mut BytesMut) -> MethodFrameArgs {
444    let _ = src.get_u8();
445
446    MethodFrameArgs::BasicGetEmpty
447}
448
449fn decode_basic_publish(src: &mut BytesMut) -> MethodFrameArgs {
450    let mut args = BasicPublishArgs::default();
451    let _ = src.get_u16();
452    args.exchange_name = decode_short_string(src);
453    args.routing_key = decode_short_string(src);
454    args.flags = BasicPublishFlags::from_bits(src.get_u8()).unwrap_or_default();
455
456    MethodFrameArgs::BasicPublish(args)
457}
458
459fn decode_basic_return(src: &mut BytesMut) -> MethodFrameArgs {
460    let args = BasicReturnArgs {
461        reply_code: src.get_u16(),
462        reply_text: decode_short_string(src),
463        exchange_name: decode_short_string(src),
464        routing_key: decode_short_string(src),
465    };
466
467    MethodFrameArgs::BasicReturn(args)
468}
469
470fn decode_basic_deliver(src: &mut BytesMut) -> MethodFrameArgs {
471    let args = BasicDeliverArgs {
472        consumer_tag: decode_short_string(src),
473        delivery_tag: src.get_u64(),
474        redelivered: src.get_u8() != 0,
475        exchange_name: decode_short_string(src),
476        routing_key: decode_short_string(src),
477    };
478
479    MethodFrameArgs::BasicDeliver(args)
480}
481
482fn decode_basic_ack(src: &mut BytesMut) -> MethodFrameArgs {
483    let args = BasicAckArgs {
484        delivery_tag: src.get_u64(),
485        multiple: src.get_u8() != 0,
486    };
487
488    MethodFrameArgs::BasicAck(args)
489}
490
491fn decode_basic_reject(src: &mut BytesMut) -> MethodFrameArgs {
492    let args = BasicRejectArgs {
493        delivery_tag: src.get_u64(),
494        requeue: src.get_u8() != 0,
495    };
496
497    MethodFrameArgs::BasicReject(args)
498}
499
500fn decode_confirm_select(src: &mut BytesMut) -> MethodFrameArgs {
501    let args = ConfirmSelectArgs {
502        no_wait: src.get_u8() != 0,
503    };
504
505    MethodFrameArgs::ConfirmSelect(args)
506}
507
508fn decode_content_header_frame(src: &mut BytesMut, channel: u16) -> AMQPFrame {
509    let class_id = src.get_u16();
510    let weight = src.get_u16();
511    let body_size = src.get_u64();
512    let property_flags = HeaderPropertyFlags::from_bits(src.get_u16()).unwrap_or_default();
513
514    let content_type = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CONTENT_TYPE);
515    let content_encoding = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CONTENT_ENCODING);
516    let headers = decode_field_table_flag(src, property_flags, HeaderPropertyFlags::HEADERS);
517    let delivery_mode = if property_flags.contains(HeaderPropertyFlags::DELIVERY_MODE) {
518        Some(src.get_u8())
519    } else {
520        None
521    };
522    let priority = if property_flags.contains(HeaderPropertyFlags::PRIORITY) {
523        Some(src.get_u8())
524    } else {
525        None
526    };
527    let correlation_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CORRELATION_ID);
528    let reply_to = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::REPLY_TO);
529    let expiration = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::EXPIRATION);
530    let message_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::MESSAGE_ID);
531    let timestamp = if property_flags.contains(HeaderPropertyFlags::TIMESTAMP) {
532        Some(src.get_u64())
533    } else {
534        None
535    };
536    let message_type = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::MESSAGE_TYPE);
537    let user_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::USER_ID);
538    let app_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::APP_ID);
539    let cluster_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CLUSTER_ID);
540
541    AMQPFrame::ContentHeader(ContentHeaderFrame {
542        channel,
543        class_id,
544        weight,
545        body_size,
546        prop_flags: property_flags,
547        cluster_id,
548        app_id,
549        user_id,
550        message_type,
551        timestamp,
552        message_id,
553        expiration,
554        reply_to,
555        correlation_id,
556        priority,
557        delivery_mode,
558        headers,
559        content_encoding,
560        content_type,
561    })
562}
563
564fn decode_short_string_flag(
565    src: &mut BytesMut,
566    flags: HeaderPropertyFlags,
567    flag: HeaderPropertyFlags,
568) -> Option<String> {
569    if flags.contains(flag) {
570        Some(decode_short_string(src))
571    } else {
572        None
573    }
574}
575
576fn decode_field_table_flag(
577    src: &mut BytesMut,
578    flags: HeaderPropertyFlags,
579    flag: HeaderPropertyFlags,
580) -> Option<FieldTable> {
581    if flags.contains(flag) {
582        decode_field_table(src)
583    } else {
584        None
585    }
586}
587
588fn decode_value(buf: &mut BytesMut) -> AMQPFieldValue {
589    match buf.get_u8() {
590        b't' => {
591            let bool_value = buf.get_u8() != 0;
592
593            AMQPFieldValue::Bool(bool_value)
594        }
595        b'S' => {
596            let string_value = decode_long_string(buf);
597
598            AMQPFieldValue::LongString(string_value)
599        }
600        b'F' => match decode_field_table(buf) {
601            None => AMQPFieldValue::EmptyFieldTable,
602            Some(table) => AMQPFieldValue::FieldTable(Box::new(table)),
603        },
604        t => panic!("Unknown type {}", t),
605    }
606}
607
608fn decode_short_string(buf: &mut BytesMut) -> String {
609    let len = buf.get_u8() as usize;
610    let sb = buf.split_to(len);
611
612    String::from_utf8(sb.to_vec()).unwrap()
613}
614
615fn decode_long_string(buf: &mut BytesMut) -> String {
616    let len = buf.get_u32() as usize;
617    let sb = buf.split_to(len);
618
619    String::from_utf8(sb.to_vec()).unwrap()
620}
621
622/// Decode a field table
623///
624/// The buffer points to the beginning of the field table which is a `u32` length
625/// information.
626fn decode_field_table(buf: &mut BytesMut) -> Option<HashMap<String, AMQPFieldValue>> {
627    let ft_len = buf.get_u32() as usize;
628
629    if ft_len == 0 {
630        return None;
631    }
632
633    let mut ft_buf = buf.split_to(ft_len);
634    let mut table = HashMap::new();
635
636    while ft_buf.has_remaining() {
637        let field_name = decode_short_string(&mut ft_buf);
638        let field_value = decode_value(&mut ft_buf);
639
640        table.insert(field_name, field_value);
641    }
642
643    Some(table)
644}
645
646fn encode_amqp_frame(buf: &mut BytesMut, frame: AMQPFrame) {
647    match frame {
648        AMQPFrame::Header => buf.put(&b"AMQP\x00\x00\x09\x01"[..]),
649
650        AMQPFrame::Method(ch, cm, args) => encode_method_frame(buf, ch, cm, &args),
651
652        AMQPFrame::ContentHeader(header_frame) => encode_content_header_frame(buf, &header_frame),
653
654        AMQPFrame::ContentBody(body_frame) => encode_content_body_frame(buf, &body_frame),
655
656        AMQPFrame::Heartbeat(channel) => encode_heartbeat_frame(buf, channel),
657    }
658}
659
660fn encode_method_frame(buf: &mut BytesMut, channel: Channel, cm: ClassMethod, args: &MethodFrameArgs) {
661    buf.put_u8(1u8);
662    buf.put_u16(channel);
663
664    let mut fr = BytesMut::with_capacity(4096);
665    fr.put_u32(cm);
666
667    match args {
668        MethodFrameArgs::ConnectionStart(args) => encode_connection_start(&mut fr, args),
669        MethodFrameArgs::ConnectionStartOk(args) => encode_connection_start_ok(&mut fr, args),
670        MethodFrameArgs::ConnectionTune(args) => encode_connection_tune(&mut fr, args),
671        MethodFrameArgs::ConnectionTuneOk(args) => encode_connection_tune_ok(&mut fr, args),
672        MethodFrameArgs::ConnectionOpen(args) => encode_connection_open(&mut fr, args),
673        MethodFrameArgs::ConnectionOpenOk => encode_connection_open_ok(&mut fr),
674        MethodFrameArgs::ConnectionClose(args) => encode_connection_close(&mut fr, args),
675        MethodFrameArgs::ConnectionCloseOk => (),
676        MethodFrameArgs::ChannelOpen => encode_channel_open(&mut fr),
677        MethodFrameArgs::ChannelOpenOk => encode_channel_open_ok(&mut fr),
678        MethodFrameArgs::ChannelClose(args) => encode_channel_close(&mut fr, args),
679        MethodFrameArgs::ChannelCloseOk => (),
680        MethodFrameArgs::ExchangeDeclare(args) => encode_exchange_declare(&mut fr, args),
681        MethodFrameArgs::ExchangeDeclareOk => (),
682        MethodFrameArgs::ExchangeDelete(args) => encode_exchange_delete(&mut fr, args),
683        MethodFrameArgs::ExchangeDeleteOk => (),
684        MethodFrameArgs::QueueDeclare(args) => encode_queue_declare(&mut fr, args),
685        MethodFrameArgs::QueueDeclareOk(args) => encode_queue_declare_ok(&mut fr, args),
686        MethodFrameArgs::QueueBind(args) => encode_queue_bind(&mut fr, args),
687        MethodFrameArgs::QueueBindOk => (),
688        MethodFrameArgs::QueuePurge(args) => encode_queue_purge(&mut fr, args),
689        MethodFrameArgs::QueuePurgeOk(args) => encode_queue_purge_ok(&mut fr, args),
690        MethodFrameArgs::QueueDelete(args) => encode_queue_delete(&mut fr, args),
691        MethodFrameArgs::QueueDeleteOk(args) => encode_queue_delete_ok(&mut fr, args),
692        MethodFrameArgs::QueueUnbind(args) => encode_queue_unbind(&mut fr, args),
693        MethodFrameArgs::QueueUnbindOk => (),
694        MethodFrameArgs::BasicConsume(args) => encode_basic_consume(&mut fr, args),
695        MethodFrameArgs::BasicConsumeOk(args) => encode_basic_consume_ok(&mut fr, args),
696        MethodFrameArgs::BasicCancel(args) => encode_basic_cancel(&mut fr, args),
697        MethodFrameArgs::BasicCancelOk(args) => encode_basic_cancel_ok(&mut fr, args),
698        MethodFrameArgs::BasicGet(args) => encode_basic_get(&mut fr, args),
699        MethodFrameArgs::BasicGetOk(args) => encode_basic_get_ok(&mut fr, args),
700        MethodFrameArgs::BasicGetEmpty => encode_basic_get_empty(&mut fr),
701        MethodFrameArgs::BasicPublish(args) => encode_basic_publish(&mut fr, args),
702        MethodFrameArgs::BasicReturn(args) => encode_basic_return(&mut fr, args),
703        MethodFrameArgs::BasicDeliver(args) => encode_basic_deliver(&mut fr, args),
704        MethodFrameArgs::BasicAck(args) => encode_basic_ack(&mut fr, args),
705        MethodFrameArgs::BasicReject(args) => encode_basic_reject(&mut fr, args),
706        MethodFrameArgs::ConfirmSelect(args) => encode_confirm_select(&mut fr, args),
707        MethodFrameArgs::ConfirmSelectOk => (),
708    }
709
710    buf.put_u32(fr.len() as u32);
711    buf.put(fr);
712    buf.put_u8(0xCE);
713}
714
715fn encode_connection_start(buf: &mut BytesMut, args: &ConnectionStartArgs) {
716    buf.put_u8(args.version_major);
717    buf.put_u8(args.version_minor);
718    encode_field_table(buf, args.properties.as_ref());
719    encode_long_string(buf, &args.mechanisms);
720    encode_long_string(buf, &args.locales);
721}
722
723fn encode_connection_start_ok(buf: &mut BytesMut, args: &ConnectionStartOkArgs) {
724    encode_field_table(buf, args.properties.as_ref());
725    encode_short_string(buf, &args.mechanism);
726    encode_long_string(buf, &args.response);
727    encode_short_string(buf, &args.locale);
728}
729
730fn encode_connection_tune(buf: &mut BytesMut, args: &ConnectionTuneArgs) {
731    buf.put_u16(args.channel_max);
732    buf.put_u32(args.frame_max);
733    buf.put_u16(args.heartbeat);
734}
735
736fn encode_connection_tune_ok(buf: &mut BytesMut, args: &ConnectionTuneOkArgs) {
737    buf.put_u16(args.channel_max);
738    buf.put_u32(args.frame_max);
739    buf.put_u16(args.heartbeat);
740}
741
742fn encode_connection_open(buf: &mut BytesMut, args: &ConnectionOpenArgs) {
743    encode_short_string(buf, &args.virtual_host);
744    encode_short_string(buf, "");
745    let mut flags = 0x00;
746
747    if args.insist {
748        flags |= 0x01;
749    }
750
751    buf.put_u8(flags);
752}
753
754fn encode_connection_open_ok(buf: &mut BytesMut) {
755    // encode empty short string
756    buf.put_u8(0);
757}
758
759fn encode_connection_close(buf: &mut BytesMut, args: &ConnectionCloseArgs) {
760    buf.put_u16(args.code);
761    encode_short_string(buf, &args.text);
762    buf.put_u16(args.class_id);
763    buf.put_u16(args.method_id);
764}
765
766fn encode_channel_open(buf: &mut BytesMut) {
767    // encode empty short string
768    buf.put_u8(0);
769}
770
771fn encode_channel_open_ok(buf: &mut BytesMut) {
772    // encode empty long string
773    buf.put_u32(0);
774}
775
776fn encode_channel_close(buf: &mut BytesMut, args: &ChannelCloseArgs) {
777    buf.put_u16(args.code);
778    encode_short_string(buf, &args.text);
779    buf.put_u16(args.class_id);
780    buf.put_u16(args.method_id);
781}
782
783fn encode_exchange_declare(buf: &mut BytesMut, args: &ExchangeDeclareArgs) {
784    buf.put_u16(0);
785    encode_short_string(buf, &args.exchange_name);
786    encode_short_string(buf, &args.exchange_type);
787    buf.put_u8(args.flags.bits());
788    encode_field_table(buf, args.args.as_ref());
789}
790
791fn encode_exchange_delete(buf: &mut BytesMut, args: &ExchangeDeleteArgs) {
792    buf.put_u16(0);
793    encode_short_string(buf, &args.exchange_name);
794    buf.put_u8(args.flags.bits());
795}
796
797fn encode_queue_declare(buf: &mut BytesMut, args: &QueueDeclareArgs) {
798    buf.put_u16(0);
799    encode_short_string(buf, &args.name);
800    buf.put_u8(args.flags.bits());
801    encode_field_table(buf, args.args.as_ref());
802}
803
804fn encode_queue_declare_ok(buf: &mut BytesMut, args: &QueueDeclareOkArgs) {
805    encode_short_string(buf, &args.name);
806    buf.put_u32(args.message_count);
807    buf.put_u32(args.consumer_count);
808}
809
810fn encode_queue_bind(buf: &mut BytesMut, args: &QueueBindArgs) {
811    buf.put_u16(0);
812    encode_short_string(buf, &args.queue_name);
813    encode_short_string(buf, &args.exchange_name);
814    encode_short_string(buf, &args.routing_key);
815    buf.put_u8(if args.no_wait { 1 } else { 0 });
816    encode_field_table(buf, args.args.as_ref());
817}
818
819fn encode_queue_purge(buf: &mut BytesMut, args: &QueuePurgeArgs) {
820    buf.put_u16(0);
821    encode_short_string(buf, &args.queue_name);
822    buf.put_u8(if args.no_wait { 1 } else { 0 });
823}
824
825fn encode_queue_purge_ok(buf: &mut BytesMut, args: &QueuePurgeOkArgs) {
826    buf.put_u32(args.message_count);
827}
828
829fn encode_queue_delete(buf: &mut BytesMut, args: &QueueDeleteArgs) {
830    buf.put_u16(0);
831    encode_short_string(buf, &args.queue_name);
832    buf.put_u8(args.flags.bits());
833}
834
835fn encode_queue_delete_ok(buf: &mut BytesMut, args: &QueueDeleteOkArgs) {
836    buf.put_u32(args.message_count);
837}
838
839fn encode_queue_unbind(buf: &mut BytesMut, args: &QueueUnbindArgs) {
840    buf.put_u16(0);
841    encode_short_string(buf, &args.queue_name);
842    encode_short_string(buf, &args.exchange_name);
843    encode_short_string(buf, &args.routing_key);
844    encode_field_table(buf, args.args.as_ref());
845}
846
847fn encode_basic_consume(buf: &mut BytesMut, args: &BasicConsumeArgs) {
848    buf.put_u16(0);
849    encode_short_string(buf, &args.queue);
850    encode_short_string(buf, &args.consumer_tag);
851    buf.put_u8(args.flags.bits());
852    encode_field_table(buf, args.args.as_ref());
853}
854
855fn encode_basic_consume_ok(buf: &mut BytesMut, args: &BasicConsumeOkArgs) {
856    encode_short_string(buf, &args.consumer_tag);
857}
858
859fn encode_basic_cancel(buf: &mut BytesMut, args: &BasicCancelArgs) {
860    encode_short_string(buf, &args.consumer_tag);
861    buf.put_u8(if args.no_wait { 1 } else { 0 });
862}
863
864fn encode_basic_cancel_ok(buf: &mut BytesMut, args: &BasicCancelOkArgs) {
865    encode_short_string(buf, &args.consumer_tag);
866}
867
868fn encode_basic_get(buf: &mut BytesMut, args: &BasicGetArgs) {
869    buf.put_u16(0);
870    encode_short_string(buf, &args.queue);
871    buf.put_u8(if args.no_ack { 1 } else { 0 });
872}
873
874fn encode_basic_get_ok(buf: &mut BytesMut, args: &BasicGetOkArgs) {
875    buf.put_u64(args.delivery_tag);
876    buf.put_u8(if args.redelivered { 1 } else { 0 });
877    encode_short_string(buf, &args.exchange_name);
878    encode_short_string(buf, &args.routing_key);
879    buf.put_u32(args.message_count);
880}
881
882fn encode_basic_get_empty(buf: &mut BytesMut) {
883    buf.put_u8(0);
884}
885
886fn encode_basic_publish(buf: &mut BytesMut, args: &BasicPublishArgs) {
887    buf.put_u16(0);
888    encode_short_string(buf, &args.exchange_name);
889    encode_short_string(buf, &args.routing_key);
890    buf.put_u8(args.flags.bits());
891}
892
893fn encode_basic_return(buf: &mut BytesMut, args: &BasicReturnArgs) {
894    buf.put_u16(args.reply_code);
895    encode_short_string(buf, &args.reply_text);
896    encode_short_string(buf, &args.exchange_name);
897    encode_short_string(buf, &args.routing_key);
898}
899
900fn encode_basic_deliver(buf: &mut BytesMut, args: &BasicDeliverArgs) {
901    encode_short_string(buf, &args.consumer_tag);
902    buf.put_u64(args.delivery_tag);
903    buf.put_u8(if args.redelivered { 1 } else { 0 });
904    encode_short_string(buf, &args.exchange_name);
905    encode_short_string(buf, &args.routing_key);
906}
907
908fn encode_basic_ack(buf: &mut BytesMut, args: &BasicAckArgs) {
909    buf.put_u64(args.delivery_tag);
910    buf.put_u8(if args.multiple { 1 } else { 0 });
911}
912
913fn encode_basic_reject(buf: &mut BytesMut, args: &BasicRejectArgs) {
914    buf.put_u64(args.delivery_tag);
915    buf.put_u8(if args.requeue { 1 } else { 0 });
916}
917
918fn encode_confirm_select(buf: &mut BytesMut, args: &ConfirmSelectArgs) {
919    buf.put_u8(if args.no_wait { 1 } else { 0 });
920}
921
922fn encode_content_header_frame(buf: &mut BytesMut, hf: &ContentHeaderFrame) {
923    buf.put_u8(2u8);
924    buf.put_u16(hf.channel);
925
926    let mut fr_buf = BytesMut::with_capacity(4096);
927    fr_buf.put_u16(hf.class_id);
928    fr_buf.put_u16(hf.weight);
929    fr_buf.put_u64(hf.body_size);
930    fr_buf.put_u16(hf.prop_flags.bits());
931
932    if let Some(s) = hf.content_type.as_ref() {
933        encode_short_string(&mut fr_buf, s);
934    }
935    if let Some(s) = hf.content_encoding.as_ref() {
936        encode_short_string(&mut fr_buf, s);
937    }
938    if hf.prop_flags.contains(HeaderPropertyFlags::HEADERS) {
939        encode_field_table(&mut fr_buf, hf.headers.as_ref());
940    }
941    if let Some(v) = hf.delivery_mode {
942        fr_buf.put_u8(v);
943    }
944    if let Some(v) = hf.priority {
945        fr_buf.put_u8(v);
946    }
947    if let Some(s) = hf.correlation_id.as_ref() {
948        encode_short_string(&mut fr_buf, s);
949    }
950    if let Some(s) = hf.reply_to.as_ref() {
951        encode_short_string(&mut fr_buf, s);
952    }
953    if let Some(s) = hf.expiration.as_ref() {
954        encode_short_string(&mut fr_buf, s);
955    }
956    if let Some(s) = hf.message_id.as_ref() {
957        encode_short_string(&mut fr_buf, s);
958    }
959    if let Some(v) = hf.timestamp {
960        fr_buf.put_u64(v);
961    }
962    if let Some(s) = hf.message_type.as_ref() {
963        encode_short_string(&mut fr_buf, s);
964    }
965    if let Some(s) = hf.user_id.as_ref() {
966        encode_short_string(&mut fr_buf, s);
967    }
968    if let Some(s) = hf.app_id.as_ref() {
969        encode_short_string(&mut fr_buf, s);
970    }
971    if let Some(s) = hf.cluster_id.as_ref() {
972        encode_short_string(&mut fr_buf, s);
973    }
974
975    buf.put_u32(fr_buf.len() as u32);
976    buf.put(fr_buf);
977    buf.put_u8(0xCE);
978}
979
980fn encode_content_body_frame(buf: &mut BytesMut, bf: &ContentBodyFrame) {
981    // TODO buf.reserve()
982    buf.put_u8(3u8);
983    buf.put_u16(bf.channel);
984
985    let mut fr_buf = BytesMut::with_capacity(bf.body.len());
986    fr_buf.put(bf.body.as_slice());
987
988    buf.put_u32(fr_buf.len() as u32);
989    buf.put(fr_buf);
990    buf.put_u8(0xCE);
991}
992
993fn encode_heartbeat_frame(buf: &mut BytesMut, channel: Channel) {
994    buf.put_u16(channel);
995    buf.put_u32(0);
996    buf.put_u8(0xCE);
997}
998
999fn encode_short_string(buf: &mut BytesMut, s: &str) {
1000    // TODO assert! that size is below 256
1001    buf.put_u8(s.len() as u8);
1002    buf.put(s.as_bytes());
1003}
1004
1005fn encode_long_string(buf: &mut BytesMut, s: &str) {
1006    buf.put_u32(s.len() as u32);
1007    buf.put(s.as_bytes());
1008}
1009
1010fn encode_empty_field_table(buf: &mut BytesMut) {
1011    buf.put_u32(0);
1012}
1013
1014fn encode_field_table(buf: &mut BytesMut, ft: Option<&HashMap<String, AMQPFieldValue>>) {
1015    match ft {
1016        None => buf.put_u32(0),
1017        Some(t) => encode_field_table2(buf, t),
1018    }
1019}
1020
1021fn encode_field_table2(buf: &mut BytesMut, ft: &HashMap<String, AMQPFieldValue>) {
1022    let mut ft_buf = BytesMut::with_capacity(4096);
1023
1024    for (name, value) in ft {
1025        encode_short_string(&mut ft_buf, name);
1026
1027        match value {
1028            AMQPFieldValue::Bool(v) => {
1029                ft_buf.put_u8(b't');
1030                ft_buf.put_u8(if *v { 1 } else { 0 });
1031            }
1032            AMQPFieldValue::LongString(v) => {
1033                ft_buf.put_u8(b'S');
1034                ft_buf.put_u32(v.len() as u32);
1035                ft_buf.put(v.as_bytes());
1036            }
1037            AMQPFieldValue::EmptyFieldTable => encode_empty_field_table(&mut ft_buf),
1038            AMQPFieldValue::FieldTable(v) => {
1039                ft_buf.put_u8(b'F');
1040
1041                // TODO we are copying here
1042                encode_field_table2(&mut ft_buf, v);
1043            }
1044        }
1045    }
1046
1047    buf.put_u32(ft_buf.len() as u32);
1048    buf.put(ft_buf);
1049}