1use crate::frame::*;
2use bytes::{Buf, BufMut, BytesMut};
3use std::collections::HashMap;
4use tokio_util::codec::{Decoder, Encoder};
5
6const FRAME_METHOD_FRAME: u8 = 0x01;
7const FRAME_CONTENT_HEADER: u8 = 0x02;
8const FRAME_CONTENT_BODY: u8 = 0x03;
9const FRAME_HEARTBEAT: u8 = 0x08;
10const FRAME_AMQP_VERSION: u8 = 0x41;
11
12pub struct AMQPCodec {}
14
15impl Encoder<AMQPFrame> for AMQPCodec {
18 type Error = std::io::Error;
19
20 fn encode(&mut self, event: AMQPFrame, mut buf: &mut BytesMut) -> Result<(), Self::Error> {
21 match event {
22 AMQPFrame::Header => buf.put(&b"AMQP\x00\x00\x09\x01"[..]),
23
24 AMQPFrame::Method(ch, cm, args) => encode_method_frame(&mut buf, ch, cm, args),
25
26 AMQPFrame::ContentHeader(header_frame) => {
27 encode_content_header_frame(&mut buf, header_frame)
28 }
29
30 AMQPFrame::ContentBody(body_frame) => encode_content_body_frame(&mut buf, body_frame),
31
32 AMQPFrame::Heartbeat(channel) => encode_heartbeat_frame(&mut buf, channel),
33 }
34
35 Ok(())
36 }
37}
38
39impl Decoder for AMQPCodec {
40 type Item = AMQPFrame;
41 type Error = std::io::Error;
42
43 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
44 if src.len() < 8 {
45 Ok(None)
46 } else {
47 match src.get_u8() {
48 FRAME_METHOD_FRAME => {
49 let channel = src.get_u16();
50 let frame_len = src.get_u32() as usize;
52
53 let mut frame_buf = src.split_to(frame_len);
54 let frame = decode_method_frame(&mut frame_buf, channel);
55
56 let _frame_separator = src.get_u8();
57
58 Ok(Some(frame))
59 }
60 FRAME_CONTENT_HEADER => {
61 let channel = src.get_u16();
62 let frame_len = src.get_u32() as usize;
63
64 let mut frame_buf = src.split_to(frame_len);
65 let frame = decode_content_header_frame(&mut frame_buf, channel);
66
67 let _frame_separator = src.get_u8();
68
69 Ok(Some(frame))
70 }
71 FRAME_CONTENT_BODY => {
72 let channel = src.get_u16();
73 let body_len = src.get_u32();
74 let bytes = src.split_to(body_len as usize);
75
76 let _frame_separator = src.get_u8();
77
78 let frame = AMQPFrame::ContentBody(ContentBodyFrame {
80 channel: channel,
81 body: bytes.to_vec(),
82 });
83
84 Ok(Some(frame))
85 }
86 FRAME_HEARTBEAT => {
87 let channel = src.get_u16();
88 let len = src.get_u32();
89 let _ = src.split_to(len as usize);
90
91 let _frame_separator = src.get_u8();
92
93 Ok(Some(AMQPFrame::Heartbeat(channel)))
94 }
95 FRAME_AMQP_VERSION => {
96 let mut head = [0u8; 7];
97 src.copy_to_slice(&mut head);
98
99 Ok(Some(AMQPFrame::Header))
102 }
103 f => Err(std::io::Error::new(
104 std::io::ErrorKind::Other,
105 format!("Unknown frame {}", f),
106 )),
107 }
108 }
109 }
110}
111
112fn decode_method_frame(mut src: &mut BytesMut, channel: u16) -> AMQPFrame {
114 let class_method = src.get_u32();
115
116 let method_frame_args = match class_method {
117 CONNECTION_START => decode_connection_start(&mut src),
118 CONNECTION_START_OK => decode_connection_start_ok(&mut src),
119 CONNECTION_TUNE => decode_connection_tune(&mut src),
120 CONNECTION_TUNE_OK => decode_connection_tune_ok(&mut src),
121 CONNECTION_OPEN => decode_connection_open(&mut src),
122 CONNECTION_OPEN_OK => decode_connection_open_ok(&mut src),
123 CONNECTION_CLOSE => decode_connection_close(&mut src),
124 CONNECTION_CLOSE_OK => MethodFrameArgs::ConnectionCloseOk,
125 CHANNEL_OPEN => decode_channel_open(&mut src),
126 CHANNEL_OPEN_OK => decode_channel_open_ok(&mut src),
127 CHANNEL_CLOSE => decode_channel_close(&mut src),
128 CHANNEL_CLOSE_OK => MethodFrameArgs::ChannelCloseOk,
129 EXCHANGE_DECLARE => decode_exchange_declare(&mut src),
130 EXCHANGE_DECLARE_OK => MethodFrameArgs::ExchangeBindOk,
131 QUEUE_DECLARE => decode_queue_declare(&mut src),
132 QUEUE_DECLARE_OK => decode_queue_declare_ok(&mut src),
133 QUEUE_BIND => decode_queue_bind(&mut src),
134 QUEUE_BIND_OK => MethodFrameArgs::QueueBindOk,
135 BASIC_CONSUME => decode_basic_consume(&mut src),
136 BASIC_CONSUME_OK => decode_basic_consume_ok(&mut src),
137 BASIC_DELIVER => decode_basic_deliver(&mut src),
138 BASIC_PUBLISH => decode_basic_publish(&mut src),
139 _ =>
140 unimplemented!("{:08X}", class_method)
141 };
142
143 AMQPFrame::Method(channel, class_method, method_frame_args)
144}
145
146fn decode_connection_start(mut src: &mut BytesMut) -> MethodFrameArgs {
147 let mut args = ConnectionStartArgs::default();
148 args.version_major = src.get_u8();
149 args.version_minor = src.get_u8();
150 args.properties = decode_field_table(&mut src);
151 args.mechanisms = decode_long_string(&mut src);
152 args.locales = decode_long_string(&mut src);
153
154 MethodFrameArgs::ConnectionStart(args)
161}
162
163fn decode_connection_start_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
164 let mut args = ConnectionStartOkArgs::default();
165 args.properties = decode_field_table(&mut src);
166 args.mechanism = decode_short_string(&mut src);
167 args.response = decode_long_string(&mut src);
168 args.locale = decode_short_string(&mut src);
169
170 MethodFrameArgs::ConnectionStartOk(args)
173}
174
175fn decode_connection_tune(src: &mut BytesMut) -> MethodFrameArgs {
176 let mut args = ConnectionTuneArgs::default();
177 args.channel_max = src.get_u16();
178 args.frame_max = src.get_u32();
179 args.heartbeat = src.get_u16();
180
181 MethodFrameArgs::ConnectionTune(args)
182}
183
184fn decode_connection_tune_ok(src: &mut BytesMut) -> MethodFrameArgs {
185 let mut args = ConnectionTuneOkArgs::default();
186 args.channel_max = src.get_u16();
187 args.frame_max = src.get_u32();
188 args.heartbeat = src.get_u16();
189
190 MethodFrameArgs::ConnectionTuneOk(args)
191}
192
193fn decode_connection_open(mut src: &mut BytesMut) -> MethodFrameArgs {
194 let virtual_host = decode_short_string(&mut src);
195 let _reserved = decode_short_string(&mut src);
196 let flags = src.get_u8();
197
198 MethodFrameArgs::ConnectionOpen(ConnectionOpenArgs {
199 virtual_host: virtual_host,
200 insist: flags & 0x01 != 0,
201 })
202}
203
204fn decode_connection_open_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
205 let _ = decode_short_string(&mut src);
206
207 MethodFrameArgs::ConnectionOpenOk
208}
209
210fn decode_connection_close(mut src: &mut BytesMut) -> MethodFrameArgs {
211 let mut args = ConnectionCloseArgs::default();
212 args.code = src.get_u16();
213 args.text = decode_short_string(&mut src);
214 args.class_id = src.get_u16();
215 args.method_id = src.get_u16();
216
217 MethodFrameArgs::ConnectionClose(args)
218}
219
220fn decode_channel_open(mut src: &mut BytesMut) -> MethodFrameArgs {
221 let _ = decode_short_string(&mut src);
222
223 MethodFrameArgs::ChannelOpen
224}
225
226fn decode_channel_open_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
227 let _ = decode_long_string(&mut src);
228
229 MethodFrameArgs::ChannelOpenOk
230}
231
232fn decode_channel_close(mut src: &mut BytesMut) -> MethodFrameArgs {
233 let mut args = ChannelCloseArgs::default();
234 args.code = src.get_u16();
235 args.text = decode_short_string(&mut src);
236 args.class_id = src.get_u16();
237 args.method_id = src.get_u16();
238
239 MethodFrameArgs::ChannelClose(args)
240}
241
242fn decode_exchange_declare(mut src: &mut BytesMut) -> MethodFrameArgs {
243 let mut args = ExchangeDeclareArgs::default();
244 let _ = src.get_u16();
245 args.exchange_name = decode_short_string(&mut src);
246 args.exchange_type = decode_short_string(&mut src);
247 args.flags = ExchangeDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
248 args.args = decode_field_table(&mut src);
249
250 MethodFrameArgs::ExchangeDeclare(args)
251}
252
253fn decode_queue_declare(mut src: &mut BytesMut) -> MethodFrameArgs {
254 let mut args = QueueDeclareArgs::default();
255 let _ = src.get_u16();
256 args.name = decode_short_string(&mut src);
257 args.flags = QueueDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
258 args.args = decode_field_table(&mut src);
259
260 MethodFrameArgs::QueueDeclare(args)
261}
262
263fn decode_queue_declare_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
264 let mut args = QueueDeclareOkArgs::default();
265 args.name = decode_short_string(&mut src);
266 args.message_count = src.get_u32();
267 args.consumer_count = src.get_u32();
268
269 MethodFrameArgs::QueueDeclareOk(args)
270}
271
272fn decode_queue_bind(mut src: &mut BytesMut) -> MethodFrameArgs {
273 let mut args = QueueBindArgs::default();
274 let _ = src.get_u16();
275 args.queue_name = decode_short_string(&mut src);
276 args.exchange_name = decode_short_string(&mut src);
277 args.routing_key = decode_short_string(&mut src);
278
279 args.no_wait = src.get_u8() != 0;
280 args.args = decode_field_table(&mut src);
281
282 MethodFrameArgs::QueueBind(args)
283}
284
285fn decode_basic_consume(mut src: &mut BytesMut) -> MethodFrameArgs {
286 let mut args = BasicConsumeArgs::default();
287 let _ = src.get_u16();
288 args.queue = decode_short_string(&mut src);
289 args.consumer_tag = decode_short_string(&mut src);
290 args.flags = BasicConsumeFlags::from_bits(src.get_u8()).unwrap_or_default();
291 args.args = decode_field_table(&mut src);
292
293 MethodFrameArgs::BasicConsume(args)
294}
295
296fn decode_basic_consume_ok(mut src: &mut BytesMut) -> MethodFrameArgs {
297 let mut args = BasicConsumeOkArgs::default();
298 args.consumer_tag = decode_short_string(&mut src);
299
300 MethodFrameArgs::BasicConsumeOk(args)
301}
302
303fn decode_basic_deliver(mut src: &mut BytesMut) -> MethodFrameArgs {
304 let mut args = BasicDeliverArgs::default();
305 args.consumer_tag = decode_short_string(&mut src);
306 args.delivery_tag = src.get_u64();
307 args.redelivered = src.get_u8() != 0;
308 args.exchange_name = decode_short_string(&mut src);
309 args.routing_key = decode_short_string(&mut src);
310
311 MethodFrameArgs::BasicDeliver(args)
312}
313
314fn decode_basic_publish(mut src: &mut BytesMut) -> MethodFrameArgs {
315 let mut args = BasicPublishArgs::default();
316 let _ = src.get_u16();
317 args.exchange_name = decode_short_string(&mut src);
318 args.routing_key = decode_short_string(&mut src);
319 args.flags = BasicPublishFlags::from_bits(src.get_u8()).unwrap_or_default();
320
321 MethodFrameArgs::BasicPublish(args)
322}
323
324
325fn decode_content_header_frame(src: &mut BytesMut, channel: u16) -> AMQPFrame {
326 let class_id = src.get_u16();
327 let weight = src.get_u16();
328 let body_size = src.get_u64();
329 let property_flags = src.get_u16();
330 AMQPFrame::ContentHeader(ContentHeaderFrame {
333 channel: channel,
334 class_id: class_id,
335 weight: weight,
336 body_size: body_size,
337 prop_flags: property_flags,
338 args: vec![],
339 })
340}
341
342fn decode_value(mut buf: &mut BytesMut) -> AMQPFieldValue {
343 match buf.get_u8() {
344 b't' => {
345 let bool_value = buf.get_u8() != 0;
346
347 AMQPFieldValue::Bool(bool_value)
348 }
349 b'S' => {
350 let string_value = decode_long_string(&mut buf);
351
352 AMQPFieldValue::LongString(string_value)
353 }
354 b'F' => match decode_field_table(&mut buf) {
355 None => AMQPFieldValue::EmptyFieldTable,
356 Some(table) => AMQPFieldValue::FieldTable(Box::new(table)),
357 },
358 t => panic!("Unknown type {}", t),
359 }
360}
361
362fn decode_short_string(buf: &mut BytesMut) -> String {
363 let len = buf.get_u8() as usize;
364 let sb = buf.split_to(len);
365
366 String::from_utf8(sb.to_vec()).unwrap()
367}
368
369fn decode_long_string(buf: &mut BytesMut) -> String {
370 let len = buf.get_u32() as usize;
371 let sb = buf.split_to(len);
372
373 String::from_utf8(sb.to_vec()).unwrap()
374}
375
376fn decode_field_table(buf: &mut BytesMut) -> Option<HashMap<String, AMQPFieldValue>> {
381 let ft_len = buf.get_u32() as usize;
382
383 if ft_len == 0 {
384 return None;
385 }
386
387 let mut ft_buf = buf.split_to(ft_len);
388 let mut table = HashMap::new();
389
390 while ft_buf.has_remaining() {
391 let field_name = decode_short_string(&mut ft_buf);
392 let field_value = decode_value(&mut ft_buf);
393
394 table.insert(field_name, field_value);
395 }
396
397 Some(table)
398}
399
400fn encode_method_frame(
401 buf: &mut BytesMut,
402 channel: Channel,
403 cm: ClassMethod,
404 args: MethodFrameArgs,
405) {
406 buf.put_u8(1u8);
407 buf.put_u16(channel);
408
409 let mut fr = BytesMut::with_capacity(4096);
410 fr.put_u32(cm);
411
412 match args {
413 MethodFrameArgs::ConnectionStart(args) => encode_connection_start(&mut fr, args),
414 MethodFrameArgs::ConnectionStartOk(args) => encode_connection_start_ok(&mut fr, args),
415 MethodFrameArgs::ConnectionTune(args) => encode_connection_tune(&mut fr, args),
416 MethodFrameArgs::ConnectionTuneOk(args) => encode_connection_tune_ok(&mut fr, args),
417 MethodFrameArgs::ConnectionOpen(args) => encode_connection_open(&mut fr, args),
418 MethodFrameArgs::ConnectionOpenOk => encode_connection_open_ok(&mut fr),
419 MethodFrameArgs::ConnectionClose(args) => encode_connection_close(&mut fr, args),
420 MethodFrameArgs::ConnectionCloseOk => (),
421 MethodFrameArgs::ChannelOpen => encode_channel_open(&mut fr),
422 MethodFrameArgs::ChannelOpenOk => encode_channel_open_ok(&mut fr),
423 MethodFrameArgs::ChannelClose(args) => encode_channel_close(&mut fr, args),
424 MethodFrameArgs::ChannelCloseOk => (),
425 MethodFrameArgs::ExchangeDeclare(args) => encode_exchange_declare(&mut fr, args),
426 MethodFrameArgs::ExchangeDeclareOk => (),
427 MethodFrameArgs::ExchangeBind(args) => encode_exchange_bind(&mut fr, args),
428 MethodFrameArgs::ExchangeBindOk => (),
429 MethodFrameArgs::QueueDeclare(args) => encode_queue_declare(&mut fr, args),
430 MethodFrameArgs::QueueDeclareOk(args) => encode_queue_declare_ok(&mut fr, args),
431 MethodFrameArgs::QueueBind(args) => encode_queue_bind(&mut fr, args),
432 MethodFrameArgs::QueueBindOk => (),
433 MethodFrameArgs::BasicPublish(args) => encode_basic_publish(&mut fr, args),
434 MethodFrameArgs::BasicConsume(args) => encode_basic_consume(&mut fr, args),
435 MethodFrameArgs::BasicConsumeOk(args) => encode_basic_consume_ok(&mut fr, args),
436 MethodFrameArgs::BasicDeliver(args) => encode_basic_deliver(&mut fr, args)
437 }
438
439 buf.put_u32(fr.len() as u32);
440 buf.put(fr);
441 buf.put_u8(0xCE);
442}
443
444fn encode_connection_start(mut buf: &mut BytesMut, args: ConnectionStartArgs) {
445 buf.put_u8(args.version_major);
446 buf.put_u8(args.version_minor);
447 encode_field_table(&mut buf, args.properties);
448 encode_long_string(&mut buf, args.mechanisms);
449 encode_long_string(&mut buf, args.locales);
450}
451
452fn encode_connection_start_ok(mut buf: &mut BytesMut, args: ConnectionStartOkArgs) {
453 encode_field_table(&mut buf, args.properties);
454 encode_short_string(&mut buf, args.mechanism);
455 encode_long_string(&mut buf, args.response);
456 encode_short_string(&mut buf, args.locale);
457}
458
459fn encode_connection_tune(buf: &mut BytesMut, args: ConnectionTuneArgs) {
460 buf.put_u16(args.channel_max);
461 buf.put_u32(args.frame_max);
462 buf.put_u16(args.heartbeat);
463}
464
465fn encode_connection_tune_ok(buf: &mut BytesMut, args: ConnectionTuneOkArgs) {
466 buf.put_u16(args.channel_max);
467 buf.put_u32(args.frame_max);
468 buf.put_u16(args.heartbeat);
469}
470
471fn encode_connection_open(buf: &mut BytesMut, args: ConnectionOpenArgs) {
472 encode_short_string(buf, args.virtual_host);
473 encode_short_string(buf, "".into());
474 let mut flags = 0x00;
475
476 if args.insist {
477 flags |= 0x01;
478 }
479
480 buf.put_u8(flags);
481}
482
483fn encode_connection_open_ok(buf: &mut BytesMut) {
484 buf.put_u8(0);
486}
487
488fn encode_connection_close(mut buf: &mut BytesMut, args: ConnectionCloseArgs) {
489 buf.put_u16(args.code);
490 encode_short_string(&mut buf, args.text);
491 buf.put_u16(args.class_id);
492 buf.put_u16(args.method_id);
493}
494
495fn encode_channel_open(buf: &mut BytesMut) {
496 buf.put_u8(0);
498}
499
500fn encode_channel_open_ok(buf: &mut BytesMut) {
501 buf.put_u32(0);
503}
504
505fn encode_channel_close(mut buf: &mut BytesMut, args: ChannelCloseArgs) {
506 buf.put_u16(args.code);
507 encode_short_string(&mut buf, args.text);
508 buf.put_u16(args.class_id);
509 buf.put_u16(args.method_id);
510}
511
512fn encode_exchange_declare(mut buf: &mut BytesMut, args: ExchangeDeclareArgs) {
513 buf.put_u16(0);
514 encode_short_string(&mut buf, args.exchange_name);
515 encode_short_string(&mut buf, args.exchange_type);
516 buf.put_u8(args.flags.bits());
517 encode_empty_field_table(&mut buf);
518}
519
520fn encode_exchange_bind(mut buf: &mut BytesMut, args: ExchangeBindArgs) {
521 buf.put_u16(0);
522 encode_short_string(&mut buf, args.destination);
523 encode_short_string(&mut buf, args.source);
524 encode_short_string(&mut buf, args.routing_key);
525 buf.put_u8(if args.no_wait { 1 } else { 0 });
526 encode_empty_field_table(&mut buf);
527}
528
529fn encode_queue_declare(mut buf: &mut BytesMut, args: QueueDeclareArgs) {
530 buf.put_u16(0);
531 encode_short_string(&mut buf, args.name);
532 buf.put_u8(args.flags.bits());
533 encode_empty_field_table(&mut buf);
534}
535
536fn encode_queue_declare_ok(mut buf: &mut BytesMut, args: QueueDeclareOkArgs) {
537 encode_short_string(&mut buf, args.name);
538 buf.put_u32(args.message_count);
539 buf.put_u32(args.consumer_count);
540}
541
542fn encode_queue_bind(mut buf: &mut BytesMut, args: QueueBindArgs) {
543 buf.put_u16(0);
544 encode_short_string(&mut buf, args.queue_name);
545 encode_short_string(&mut buf, args.exchange_name);
546 encode_short_string(&mut buf, args.routing_key);
547 buf.put_u8(if args.no_wait { 1 } else { 0 });
548 encode_empty_field_table(&mut buf);
549}
550
551fn encode_basic_consume(mut buf: &mut BytesMut, args: BasicConsumeArgs) {
552 buf.put_u16(0);
553 encode_short_string(&mut buf, args.queue);
554 encode_short_string(&mut buf, args.consumer_tag);
555 buf.put_u8(args.flags.bits());
556 encode_empty_field_table(&mut buf);
557}
558
559fn encode_basic_consume_ok(mut buf: &mut BytesMut, args: BasicConsumeOkArgs) {
560 encode_short_string(&mut buf, args.consumer_tag);
561}
562
563fn encode_basic_deliver(mut buf: &mut BytesMut, args: BasicDeliverArgs) {
564 encode_short_string(&mut buf, args.consumer_tag);
565 buf.put_u64(args.delivery_tag);
566 buf.put_u8(if args.redelivered { 1 } else { 0 });
567 encode_short_string(&mut buf, args.exchange_name);
568 encode_short_string(&mut buf, args.routing_key);
569}
570
571 fn encode_basic_publish(mut buf: &mut BytesMut, args: BasicPublishArgs) {
573 buf.put_u16(0);
574 encode_short_string(&mut buf, args.exchange_name);
575 encode_short_string(&mut buf, args.routing_key);
576 buf.put_u8(args.flags.bits());
577}
578
579fn encode_content_header_frame(buf: &mut BytesMut, hf: ContentHeaderFrame) {
580 buf.put_u8(2u8);
581 buf.put_u16(hf.channel);
582
583 let mut fr_buf = BytesMut::with_capacity(4096);
584 fr_buf.put_u16(hf.class_id);
585 fr_buf.put_u16(hf.weight);
586 fr_buf.put_u64(hf.body_size);
587 fr_buf.put_u16(hf.prop_flags);
588
589 buf.put_u32(fr_buf.len() as u32);
592 buf.put(fr_buf);
593 buf.put_u8(0xCE);
594}
595
596fn encode_content_body_frame(buf: &mut BytesMut, bf: ContentBodyFrame) {
597 buf.put_u8(3u8);
598 buf.put_u16(bf.channel);
599
600 let mut fr_buf = BytesMut::with_capacity(bf.body.len());
601 fr_buf.put(bf.body.as_slice());
602
603 buf.put_u32(fr_buf.len() as u32);
604 buf.put(fr_buf);
605 buf.put_u8(0xCE);
606}
607
608fn encode_heartbeat_frame(buf: &mut BytesMut, channel: Channel) {
609 buf.put_u16(channel);
610 buf.put_u32(0);
611 buf.put_u8(0xCE);
612}
613
614fn encode_short_string(buf: &mut BytesMut, s: String) {
615 buf.put_u8(s.len() as u8);
617 buf.put(s.as_bytes());
618}
619
620fn encode_long_string(buf: &mut BytesMut, s: String) {
621 buf.put_u32(s.len() as u32);
622 buf.put(s.as_bytes());
623}
624
625fn encode_empty_field_table(buf: &mut BytesMut) {
626 buf.put_u32(0);
627}
628
629fn encode_field_table(mut buf: &mut BytesMut, ft: Option<HashMap<String, AMQPFieldValue>>) {
630 match ft {
631 None => buf.put_u32(0),
632 Some(t) => encode_field_table2(&mut buf, t),
633 }
634}
635
636fn encode_field_table2(buf: &mut BytesMut, ft: HashMap<String, AMQPFieldValue>) {
637 let mut ft_buf = BytesMut::with_capacity(4096);
638
639 for (name, value) in ft {
640 encode_short_string(&mut ft_buf, name);
641
642 match value {
643 AMQPFieldValue::Bool(v) => {
644 ft_buf.put_u8(b't');
645 ft_buf.put_u8(if v { 1 } else { 0 });
646 }
647 AMQPFieldValue::LongString(v) => {
648 ft_buf.put_u8(b'S');
649 ft_buf.put_u32(v.len() as u32);
650 ft_buf.put(v.as_bytes());
651 }
652 AMQPFieldValue::EmptyFieldTable => encode_empty_field_table(&mut ft_buf),
653 AMQPFieldValue::FieldTable(v) => {
654 ft_buf.put_u8(b'F');
655
656 encode_field_table2(&mut ft_buf, *v);
658 }
659 }
660 }
661
662 buf.put_u32(ft_buf.len() as u32);
663 buf.put(ft_buf);
664}
665
666#[allow(dead_code)]
667fn dump(buf: &BytesMut) {
668 let mut cloned = buf.clone();
669 let mut i: usize = 0;
670 let mut text: Vec<u8> = Vec::new();
671
672 println!("---");
673
674 while cloned.has_remaining() {
675 let b = cloned.get_u8();
676
677 print!("{:02X} ", b);
678
679 if (b as char).is_alphanumeric() {
680 text.push(b);
681 } else {
682 text.push(b'.');
683 }
684
685 i += 1;
686
687 if i % 16 == 0 {
688 println!("{}", std::str::from_utf8(&text).unwrap_or_default());
689 text.clear();
690 }
691 }
692
693 println!("---");
694}