ironmq_codec/
codec.rs

1use crate::frame::*;
2use bytes::{Buf, BufMut, BytesMut};
3use std::collections::HashMap;
4use tokio_util::codec::{Decoder, Encoder};
5
6const FRAME_METHOD_FRAME: u8 = 0x01;
7const FRAME_CONTENT_HEADER: u8 = 0x02;
8const FRAME_CONTENT_BODY: u8 = 0x03;
9const FRAME_HEARTBEAT: u8 = 0x08;
10const FRAME_AMQP_VERSION: u8 = 0x41;
11
12/// Placeholder for AMQP encoder and decoder functions.
13pub struct AMQPCodec {}
14
15// TODO change type of encoder, decoder, they should deal with Vec<AMQPFrame>
16
17impl Encoder<AMQPFrame> for AMQPCodec {
18    type Error = std::io::Error;
19
20    fn encode(&mut self, event: AMQPFrame, mut buf: &mut BytesMut) -> Result<(), Self::Error> {
21        match event {
22            AMQPFrame::Header => buf.put(&b"AMQP\x00\x00\x09\x01"[..]),
23
24            AMQPFrame::Method(ch, cm, args) => encode_method_frame(&mut buf, ch, cm, args),
25
26            AMQPFrame::ContentHeader(header_frame) => {
27                encode_content_header_frame(&mut buf, header_frame)
28            }
29
30            AMQPFrame::ContentBody(body_frame) => encode_content_body_frame(&mut buf, body_frame),
31
32            AMQPFrame::Heartbeat(channel) => encode_heartbeat_frame(&mut buf, channel),
33        }
34
35        Ok(())
36    }
37}
38
39impl Decoder for AMQPCodec {
40    type Item = AMQPFrame;
41    type Error = std::io::Error;
42
43    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
44        if src.len() < 8 {
45            Ok(None)
46        } else {
47            match src.get_u8() {
48                FRAME_METHOD_FRAME => {
49                    let channel = src.get_u16();
50                    // TODO amqp frame can be u32 but Buf handles only usize buffes
51                    let frame_len = src.get_u32() as usize;
52
53                    let mut frame_buf = src.split_to(frame_len);
54                    let frame = decode_method_frame(&mut frame_buf, channel);
55
56                    let _frame_separator = src.get_u8();
57
58                    Ok(Some(frame))
59                }
60                FRAME_CONTENT_HEADER => {
61                    let channel = src.get_u16();
62                    let frame_len = src.get_u32() as usize;
63
64                    let mut frame_buf = src.split_to(frame_len);
65                    let frame = decode_content_header_frame(&mut frame_buf, channel);
66
67                    let _frame_separator = src.get_u8();
68
69                    Ok(Some(frame))
70                }
71                FRAME_CONTENT_BODY => {
72                    let channel = src.get_u16();
73                    let body_len = src.get_u32();
74                    let bytes = src.split_to(body_len as usize);
75
76                    let _frame_separator = src.get_u8();
77
78                    // TODO more effective copy
79                    let frame = AMQPFrame::ContentBody(ContentBodyFrame {
80                        channel: channel,
81                        body: bytes.to_vec(),
82                    });
83
84                    Ok(Some(frame))
85                }
86                FRAME_HEARTBEAT => {
87                    let channel = src.get_u16();
88                    let len = src.get_u32();
89                    let _ = src.split_to(len as usize);
90
91                    let _frame_separator = src.get_u8();
92
93                    Ok(Some(AMQPFrame::Heartbeat(channel)))
94                }
95                FRAME_AMQP_VERSION => {
96                    let mut head = [0u8; 7];
97                    src.copy_to_slice(&mut head);
98
99                    // TODO check if version is 0091
100
101                    Ok(Some(AMQPFrame::Header))
102                }
103                f => Err(std::io::Error::new(
104                    std::io::ErrorKind::Other,
105                    format!("Unknown frame {}", f),
106                )),
107            }
108        }
109    }
110}
111
112// TODO have an Error type here, and it should be result<>
113fn decode_method_frame(mut src: &mut BytesMut, channel: u16) -> AMQPFrame {
114    let class_method = src.get_u32();
115
116    let method_frame_args = match class_method {
117        CONNECTION_START => decode_connection_start(&mut src),
118        CONNECTION_START_OK => decode_connection_start_ok(&mut src),
119        CONNECTION_TUNE => decode_connection_tune(&mut src),
120        CONNECTION_TUNE_OK => decode_connection_tune_ok(&mut src),
121        CONNECTION_OPEN => decode_connection_open(&mut src),
122        CONNECTION_OPEN_OK => decode_connection_open_ok(&mut src),
123        CONNECTION_CLOSE => decode_connection_close(&mut src),
124        CONNECTION_CLOSE_OK => MethodFrameArgs::ConnectionCloseOk,
125        CHANNEL_OPEN => decode_channel_open(&mut src),
126        CHANNEL_OPEN_OK => decode_channel_open_ok(&mut src),
127        CHANNEL_CLOSE => decode_channel_close(&mut src),
128        CHANNEL_CLOSE_OK => MethodFrameArgs::ChannelCloseOk,
129        EXCHANGE_DECLARE => decode_exchange_declare(&mut src),
130        EXCHANGE_DECLARE_OK => MethodFrameArgs::ExchangeBindOk,
131        QUEUE_DECLARE => decode_queue_declare(&mut src),
132        QUEUE_DECLARE_OK => decode_queue_declare_ok(&mut src),
133        QUEUE_BIND => decode_queue_bind(&mut src),
134        QUEUE_BIND_OK => MethodFrameArgs::QueueBindOk,
135        BASIC_CONSUME => decode_basic_consume(&mut src),
136        BASIC_CONSUME_OK => decode_basic_consume_ok(&mut src),
137        BASIC_DELIVER => decode_basic_deliver(&mut src),
138        BASIC_PUBLISH => decode_basic_publish(&mut src),
139        _ =>
140            unimplemented!("{:08X}", class_method)
141    };
142
143    AMQPFrame::Method(channel, class_method, method_frame_args)
144}
145
146fn decode_connection_start(mut src: &mut BytesMut) -> MethodFrameArgs {
147    let mut args = ConnectionStartArgs::default();
148    args.version_major = src.get_u8();
149    args.version_minor = src.get_u8();
150    args.properties = decode_field_table(&mut src);
151    args.mechanisms = decode_long_string(&mut src);
152    args.locales = decode_long_string(&mut src);
153
154    //if let Some(ref table) = args.properties {
155    //    if let Some(AMQPFieldValue::FieldTable(cap)) = table.get("capabilities".into()) {
156    //        args.capabilities = Some(**cap.clone());
157    //    }
158    //}
159
160    MethodFrameArgs::ConnectionStart(args)
161}
162
163fn decode_connection_start_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
164    let mut args = ConnectionStartOkArgs::default();
165    args.properties = decode_field_table(&mut src);
166    args.mechanism = decode_short_string(&mut src);
167    args.response = decode_long_string(&mut src);
168    args.locale = decode_short_string(&mut src);
169
170    // TODO init capabilities!
171
172    MethodFrameArgs::ConnectionStartOk(args)
173}
174
175fn decode_connection_tune(src: &mut BytesMut) -> MethodFrameArgs {
176    let mut args = ConnectionTuneArgs::default();
177    args.channel_max = src.get_u16();
178    args.frame_max = src.get_u32();
179    args.heartbeat = src.get_u16();
180
181    MethodFrameArgs::ConnectionTune(args)
182}
183
184fn decode_connection_tune_ok(src: &mut BytesMut) -> MethodFrameArgs {
185    let mut args = ConnectionTuneOkArgs::default();
186    args.channel_max = src.get_u16();
187    args.frame_max = src.get_u32();
188    args.heartbeat = src.get_u16();
189
190    MethodFrameArgs::ConnectionTuneOk(args)
191}
192
193fn decode_connection_open(mut src: &mut BytesMut) -> MethodFrameArgs {
194    let virtual_host = decode_short_string(&mut src);
195    let _reserved = decode_short_string(&mut src);
196    let flags = src.get_u8();
197
198    MethodFrameArgs::ConnectionOpen(ConnectionOpenArgs {
199        virtual_host: virtual_host,
200        insist: flags & 0x01 != 0,
201    })
202}
203
204fn decode_connection_open_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
205    let _ = decode_short_string(&mut src);
206
207    MethodFrameArgs::ConnectionOpenOk
208}
209
210fn decode_connection_close(mut src: &mut BytesMut) -> MethodFrameArgs {
211    let mut args = ConnectionCloseArgs::default();
212    args.code = src.get_u16();
213    args.text = decode_short_string(&mut src);
214    args.class_id = src.get_u16();
215    args.method_id = src.get_u16();
216
217    MethodFrameArgs::ConnectionClose(args)
218}
219
220fn decode_channel_open(mut src: &mut BytesMut) -> MethodFrameArgs {
221    let _ = decode_short_string(&mut src);
222
223    MethodFrameArgs::ChannelOpen
224}
225
226fn decode_channel_open_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
227    let _ = decode_long_string(&mut src);
228
229    MethodFrameArgs::ChannelOpenOk
230}
231
232fn decode_channel_close(mut src: &mut BytesMut) -> MethodFrameArgs {
233    let mut args = ChannelCloseArgs::default();
234    args.code = src.get_u16();
235    args.text = decode_short_string(&mut src);
236    args.class_id = src.get_u16();
237    args.method_id = src.get_u16();
238
239    MethodFrameArgs::ChannelClose(args)
240}
241
242fn decode_exchange_declare(mut src: &mut BytesMut) -> MethodFrameArgs {
243    let mut args = ExchangeDeclareArgs::default();
244    let _ = src.get_u16();
245    args.exchange_name = decode_short_string(&mut src);
246    args.exchange_type = decode_short_string(&mut src);
247    args.flags = ExchangeDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
248    args.args = decode_field_table(&mut src);
249
250    MethodFrameArgs::ExchangeDeclare(args)
251}
252
253fn decode_queue_declare(mut src: &mut BytesMut) -> MethodFrameArgs {
254    let mut args = QueueDeclareArgs::default();
255    let _ = src.get_u16();
256    args.name = decode_short_string(&mut src);
257    args.flags = QueueDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
258    args.args = decode_field_table(&mut src);
259
260    MethodFrameArgs::QueueDeclare(args)
261}
262
263fn decode_queue_declare_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
264    let mut args = QueueDeclareOkArgs::default();
265    args.name = decode_short_string(&mut src);
266    args.message_count = src.get_u32();
267    args.consumer_count = src.get_u32();
268
269    MethodFrameArgs::QueueDeclareOk(args)
270}
271
272fn decode_queue_bind(mut src: &mut BytesMut) -> MethodFrameArgs {
273    let mut args = QueueBindArgs::default();
274    let _ = src.get_u16();
275    args.queue_name = decode_short_string(&mut src);
276    args.exchange_name = decode_short_string(&mut src);
277    args.routing_key = decode_short_string(&mut src);
278
279    args.no_wait = src.get_u8() != 0;
280    args.args = decode_field_table(&mut src);
281
282    MethodFrameArgs::QueueBind(args)
283}
284
285fn decode_basic_consume(mut src: &mut BytesMut) -> MethodFrameArgs {
286    let mut args = BasicConsumeArgs::default();
287    let _ = src.get_u16();
288    args.queue = decode_short_string(&mut src);
289    args.consumer_tag = decode_short_string(&mut src);
290    args.flags = BasicConsumeFlags::from_bits(src.get_u8()).unwrap_or_default();
291    args.args = decode_field_table(&mut src);
292
293    MethodFrameArgs::BasicConsume(args)
294}
295
296fn decode_basic_consume_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
297    let mut args = BasicConsumeOkArgs::default();
298    args.consumer_tag = decode_short_string(&mut src);
299
300    MethodFrameArgs::BasicConsumeOk(args)
301}
302
303fn decode_basic_deliver(mut src: &mut BytesMut) -> MethodFrameArgs {
304    let mut args = BasicDeliverArgs::default();
305    args.consumer_tag = decode_short_string(&mut src);
306    args.delivery_tag = src.get_u64();
307    args.redelivered = src.get_u8() != 0;
308    args.exchange_name = decode_short_string(&mut src);
309    args.routing_key = decode_short_string(&mut src);
310
311    MethodFrameArgs::BasicDeliver(args)
312}
313
314fn decode_basic_publish(mut src: &mut BytesMut) -> MethodFrameArgs {
315    let mut args = BasicPublishArgs::default();
316    let _ = src.get_u16();
317    args.exchange_name = decode_short_string(&mut src);
318    args.routing_key = decode_short_string(&mut src);
319    args.flags = BasicPublishFlags::from_bits(src.get_u8()).unwrap_or_default();
320
321    MethodFrameArgs::BasicPublish(args)
322}
323
324
325fn decode_content_header_frame(src: &mut BytesMut, channel: u16) -> AMQPFrame {
326    let class_id = src.get_u16();
327    let weight = src.get_u16();
328    let body_size = src.get_u64();
329    let property_flags = src.get_u16();
330    // TODO property list, it seems that we need to know from the class_id what is the type list
331
332    AMQPFrame::ContentHeader(ContentHeaderFrame {
333        channel: channel,
334        class_id: class_id,
335        weight: weight,
336        body_size: body_size,
337        prop_flags: property_flags,
338        args: vec![],
339    })
340}
341
342fn decode_value(mut buf: &mut BytesMut) -> AMQPFieldValue {
343    match buf.get_u8() {
344        b't' => {
345            let bool_value = buf.get_u8() != 0;
346
347            AMQPFieldValue::Bool(bool_value)
348        }
349        b'S' => {
350            let string_value = decode_long_string(&mut buf);
351
352            AMQPFieldValue::LongString(string_value)
353        }
354        b'F' => match decode_field_table(&mut buf) {
355            None => AMQPFieldValue::EmptyFieldTable,
356            Some(table) => AMQPFieldValue::FieldTable(Box::new(table)),
357        },
358        t => panic!("Unknown type {}", t),
359    }
360}
361
362fn decode_short_string(buf: &mut BytesMut) -> String {
363    let len = buf.get_u8() as usize;
364    let sb = buf.split_to(len);
365
366    String::from_utf8(sb.to_vec()).unwrap()
367}
368
369fn decode_long_string(buf: &mut BytesMut) -> String {
370    let len = buf.get_u32() as usize;
371    let sb = buf.split_to(len);
372
373    String::from_utf8(sb.to_vec()).unwrap()
374}
375
376/// Decode a field table
377///
378/// The buffer points to the beginning of the field table which is a `u32` length
379/// information.
380fn decode_field_table(buf: &mut BytesMut) -> Option<HashMap<String, AMQPFieldValue>> {
381    let ft_len = buf.get_u32() as usize;
382
383    if ft_len == 0 {
384        return None;
385    }
386
387    let mut ft_buf = buf.split_to(ft_len);
388    let mut table = HashMap::new();
389
390    while ft_buf.has_remaining() {
391        let field_name = decode_short_string(&mut ft_buf);
392        let field_value = decode_value(&mut ft_buf);
393
394        table.insert(field_name, field_value);
395    }
396
397    Some(table)
398}
399
400fn encode_method_frame(
401    buf: &mut BytesMut,
402    channel: Channel,
403    cm: ClassMethod,
404    args: MethodFrameArgs,
405) {
406    buf.put_u8(1u8);
407    buf.put_u16(channel);
408
409    let mut fr = BytesMut::with_capacity(4096);
410    fr.put_u32(cm);
411
412    match args {
413        MethodFrameArgs::ConnectionStart(args) => encode_connection_start(&mut fr, args),
414        MethodFrameArgs::ConnectionStartOk(args) => encode_connection_start_ok(&mut fr, args),
415        MethodFrameArgs::ConnectionTune(args) => encode_connection_tune(&mut fr, args),
416        MethodFrameArgs::ConnectionTuneOk(args) => encode_connection_tune_ok(&mut fr, args),
417        MethodFrameArgs::ConnectionOpen(args) => encode_connection_open(&mut fr, args),
418        MethodFrameArgs::ConnectionOpenOk => encode_connection_open_ok(&mut fr),
419        MethodFrameArgs::ConnectionClose(args) => encode_connection_close(&mut fr, args),
420        MethodFrameArgs::ConnectionCloseOk => (),
421        MethodFrameArgs::ChannelOpen => encode_channel_open(&mut fr),
422        MethodFrameArgs::ChannelOpenOk => encode_channel_open_ok(&mut fr),
423        MethodFrameArgs::ChannelClose(args) => encode_channel_close(&mut fr, args),
424        MethodFrameArgs::ChannelCloseOk => (),
425        MethodFrameArgs::ExchangeDeclare(args) => encode_exchange_declare(&mut fr, args),
426        MethodFrameArgs::ExchangeDeclareOk => (),
427        MethodFrameArgs::ExchangeBind(args) => encode_exchange_bind(&mut fr, args),
428        MethodFrameArgs::ExchangeBindOk => (),
429        MethodFrameArgs::QueueDeclare(args) => encode_queue_declare(&mut fr, args),
430        MethodFrameArgs::QueueDeclareOk(args) => encode_queue_declare_ok(&mut fr, args),
431        MethodFrameArgs::QueueBind(args) => encode_queue_bind(&mut fr, args),
432        MethodFrameArgs::QueueBindOk => (),
433        MethodFrameArgs::BasicPublish(args) => encode_basic_publish(&mut fr, args),
434        MethodFrameArgs::BasicConsume(args) => encode_basic_consume(&mut fr, args),
435        MethodFrameArgs::BasicConsumeOk(args) => encode_basic_consume_ok(&mut fr, args),
436        MethodFrameArgs::BasicDeliver(args) => encode_basic_deliver(&mut fr, args)
437    }
438
439    buf.put_u32(fr.len() as u32);
440    buf.put(fr);
441    buf.put_u8(0xCE);
442}
443
444fn encode_connection_start(mut buf: &mut BytesMut, args: ConnectionStartArgs) {
445    buf.put_u8(args.version_major);
446    buf.put_u8(args.version_minor);
447    encode_field_table(&mut buf, args.properties);
448    encode_long_string(&mut buf, args.mechanisms);
449    encode_long_string(&mut buf, args.locales);
450}
451
452fn encode_connection_start_ok(mut buf: &mut BytesMut, args: ConnectionStartOkArgs) {
453    encode_field_table(&mut buf, args.properties);
454    encode_short_string(&mut buf, args.mechanism);
455    encode_long_string(&mut buf, args.response);
456    encode_short_string(&mut buf, args.locale);
457}
458
459fn encode_connection_tune(buf: &mut BytesMut, args: ConnectionTuneArgs) {
460    buf.put_u16(args.channel_max);
461    buf.put_u32(args.frame_max);
462    buf.put_u16(args.heartbeat);
463}
464
465fn encode_connection_tune_ok(buf: &mut BytesMut, args: ConnectionTuneOkArgs) {
466    buf.put_u16(args.channel_max);
467    buf.put_u32(args.frame_max);
468    buf.put_u16(args.heartbeat);
469}
470
471fn encode_connection_open(buf: &mut BytesMut, args: ConnectionOpenArgs) {
472    encode_short_string(buf, args.virtual_host);
473    encode_short_string(buf, "".into());
474    let mut flags = 0x00;
475
476    if args.insist {
477        flags |= 0x01;
478    }
479
480    buf.put_u8(flags);
481}
482
483fn encode_connection_open_ok(buf: &mut BytesMut) {
484    // encode empty short string
485    buf.put_u8(0);
486}
487
488fn encode_connection_close(mut buf: &mut BytesMut, args: ConnectionCloseArgs) {
489    buf.put_u16(args.code);
490    encode_short_string(&mut buf, args.text);
491    buf.put_u16(args.class_id);
492    buf.put_u16(args.method_id);
493}
494
495fn encode_channel_open(buf: &mut BytesMut) {
496    // encode empty short string
497    buf.put_u8(0);
498}
499
500fn encode_channel_open_ok(buf: &mut BytesMut) {
501    // encode empty long string
502    buf.put_u32(0);
503}
504
505fn encode_channel_close(mut buf: &mut BytesMut, args: ChannelCloseArgs) {
506    buf.put_u16(args.code);
507    encode_short_string(&mut buf, args.text);
508    buf.put_u16(args.class_id);
509    buf.put_u16(args.method_id);
510}
511
512fn encode_exchange_declare(mut buf: &mut BytesMut, args: ExchangeDeclareArgs) {
513    buf.put_u16(0);
514    encode_short_string(&mut buf, args.exchange_name);
515    encode_short_string(&mut buf, args.exchange_type);
516    buf.put_u8(args.flags.bits());
517    encode_empty_field_table(&mut buf);
518}
519
520fn encode_exchange_bind(mut buf: &mut BytesMut, args: ExchangeBindArgs) {
521    buf.put_u16(0);
522    encode_short_string(&mut buf, args.destination);
523    encode_short_string(&mut buf, args.source);
524    encode_short_string(&mut buf, args.routing_key);
525    buf.put_u8(if args.no_wait { 1 } else { 0 });
526    encode_empty_field_table(&mut buf);
527}
528
529fn encode_queue_declare(mut buf: &mut BytesMut, args: QueueDeclareArgs) {
530    buf.put_u16(0);
531    encode_short_string(&mut buf, args.name);
532    buf.put_u8(args.flags.bits());
533    encode_empty_field_table(&mut buf);
534}
535
536fn encode_queue_declare_ok(mut buf: &mut BytesMut, args: QueueDeclareOkArgs) {
537    encode_short_string(&mut buf, args.name);
538    buf.put_u32(args.message_count);
539    buf.put_u32(args.consumer_count);
540}
541
542fn encode_queue_bind(mut buf: &mut BytesMut, args: QueueBindArgs) {
543    buf.put_u16(0);
544    encode_short_string(&mut buf, args.queue_name);
545    encode_short_string(&mut buf, args.exchange_name);
546    encode_short_string(&mut buf, args.routing_key);
547    buf.put_u8(if args.no_wait { 1 } else { 0 });
548    encode_empty_field_table(&mut buf);
549}
550
551fn encode_basic_consume(mut buf: &mut BytesMut, args: BasicConsumeArgs) {
552    buf.put_u16(0);
553    encode_short_string(&mut buf, args.queue);
554    encode_short_string(&mut buf, args.consumer_tag);
555    buf.put_u8(args.flags.bits());
556    encode_empty_field_table(&mut buf);
557}
558
559fn encode_basic_consume_ok(mut buf: &mut BytesMut, args: BasicConsumeOkArgs) {
560    encode_short_string(&mut buf, args.consumer_tag);
561}
562
563fn encode_basic_deliver(mut buf: &mut BytesMut, args: BasicDeliverArgs) {
564    encode_short_string(&mut buf, args.consumer_tag);
565    buf.put_u64(args.delivery_tag);
566    buf.put_u8(if args.redelivered { 1 } else { 0 });
567    encode_short_string(&mut buf, args.exchange_name);
568    encode_short_string(&mut buf, args.routing_key);
569}
570
571        //BASIC_DELIVER => vec![t_ss!(), t_u64!(), t_u8!(), t_ss!(), t_ss!()],
572fn encode_basic_publish(mut buf: &mut BytesMut, args: BasicPublishArgs) {
573    buf.put_u16(0);
574    encode_short_string(&mut buf, args.exchange_name);
575    encode_short_string(&mut buf, args.routing_key);
576    buf.put_u8(args.flags.bits());
577}
578
579fn encode_content_header_frame(buf: &mut BytesMut, hf: ContentHeaderFrame) {
580    buf.put_u8(2u8);
581    buf.put_u16(hf.channel);
582
583    let mut fr_buf = BytesMut::with_capacity(4096);
584    fr_buf.put_u16(hf.class_id);
585    fr_buf.put_u16(hf.weight);
586    fr_buf.put_u64(hf.body_size);
587    fr_buf.put_u16(hf.prop_flags);
588
589    // TODO encode property list here
590
591    buf.put_u32(fr_buf.len() as u32);
592    buf.put(fr_buf);
593    buf.put_u8(0xCE);
594}
595
596fn encode_content_body_frame(buf: &mut BytesMut, bf: ContentBodyFrame) {
597    buf.put_u8(3u8);
598    buf.put_u16(bf.channel);
599
600    let mut fr_buf = BytesMut::with_capacity(bf.body.len());
601    fr_buf.put(bf.body.as_slice());
602
603    buf.put_u32(fr_buf.len() as u32);
604    buf.put(fr_buf);
605    buf.put_u8(0xCE);
606}
607
608fn encode_heartbeat_frame(buf: &mut BytesMut, channel: Channel) {
609    buf.put_u16(channel);
610    buf.put_u32(0);
611    buf.put_u8(0xCE);
612}
613
614fn encode_short_string(buf: &mut BytesMut, s: String) {
615    // TODO assert! that size is below 256
616    buf.put_u8(s.len() as u8);
617    buf.put(s.as_bytes());
618}
619
620fn encode_long_string(buf: &mut BytesMut, s: String) {
621    buf.put_u32(s.len() as u32);
622    buf.put(s.as_bytes());
623}
624
625fn encode_empty_field_table(buf: &mut BytesMut) {
626    buf.put_u32(0);
627}
628
629fn encode_field_table(mut buf: &mut BytesMut, ft: Option<HashMap<String, AMQPFieldValue>>) {
630    match ft {
631        None => buf.put_u32(0),
632        Some(t) => encode_field_table2(&mut buf, t),
633    }
634}
635
636fn encode_field_table2(buf: &mut BytesMut, ft: HashMap<String, AMQPFieldValue>) {
637    let mut ft_buf = BytesMut::with_capacity(4096);
638
639    for (name, value) in ft {
640        encode_short_string(&mut ft_buf, name);
641
642        match value {
643            AMQPFieldValue::Bool(v) => {
644                ft_buf.put_u8(b't');
645                ft_buf.put_u8(if v { 1 } else { 0 });
646            }
647            AMQPFieldValue::LongString(v) => {
648                ft_buf.put_u8(b'S');
649                ft_buf.put_u32(v.len() as u32);
650                ft_buf.put(v.as_bytes());
651            }
652            AMQPFieldValue::EmptyFieldTable => encode_empty_field_table(&mut ft_buf),
653            AMQPFieldValue::FieldTable(v) => {
654                ft_buf.put_u8(b'F');
655
656                // TODO we are copying here
657                encode_field_table2(&mut ft_buf, *v);
658            }
659        }
660    }
661
662    buf.put_u32(ft_buf.len() as u32);
663    buf.put(ft_buf);
664}
665
666#[allow(dead_code)]
667fn dump(buf: &BytesMut) {
668    let mut cloned = buf.clone();
669    let mut i: usize = 0;
670    let mut text: Vec<u8> = Vec::new();
671
672    println!("---");
673
674    while cloned.has_remaining() {
675        let b = cloned.get_u8();
676
677        print!("{:02X} ", b);
678
679        if (b as char).is_alphanumeric() {
680            text.push(b);
681        } else {
682            text.push(b'.');
683        }
684
685        i += 1;
686
687        if i % 16 == 0 {
688            println!("{}", std::str::from_utf8(&text).unwrap_or_default());
689            text.clear();
690        }
691    }
692
693    println!("---");
694}