1use crate::frame::*;
2use bytes::{Buf, BufMut, BytesMut};
3use std::collections::HashMap;
4use tokio_util::codec::{Decoder, Encoder};
5
6const FRAME_METHOD_FRAME: u8 = 0x01;
7const FRAME_CONTENT_HEADER: u8 = 0x02;
8const FRAME_CONTENT_BODY: u8 = 0x03;
9const FRAME_HEARTBEAT: u8 = 0x08;
10const FRAME_AMQP_VERSION: u8 = 0x41;
11
12pub struct AMQPCodec {}
14
15#[allow(clippy::large_enum_variant)]
16#[derive(Debug)]
17pub enum Frame {
18 Frame(AMQPFrame),
19 Frames(Vec<AMQPFrame>),
20}
21
22impl Encoder<Frame> for AMQPCodec {
23 type Error = std::io::Error;
24
25 fn encode(&mut self, event: Frame, buf: &mut BytesMut) -> Result<(), Self::Error> {
26 match event {
27 Frame::Frame(frame) => encode_amqp_frame(buf, frame),
28 Frame::Frames(frames) => {
29 for frame in frames {
30 encode_amqp_frame(buf, frame);
31 }
32 }
33 }
34
35 Ok(())
36 }
37}
38
39impl Decoder for AMQPCodec {
40 type Item = Frame;
41 type Error = std::io::Error;
42
43 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
45 if src.len() < 7 || !is_full_frame(src) {
46 return Ok(None);
47 }
48
49 match src.get_u8() {
50 FRAME_METHOD_FRAME => {
51 let channel = src.get_u16();
52 let frame_len = src.get_u32() as usize;
54
55 if src.len() < frame_len + 1 {
58 unreachable!();
59 }
60 let mut frame_buf = src.split_to(frame_len);
61 let frame = decode_method_frame(&mut frame_buf, channel);
62
63 let _frame_separator = src.get_u8();
64
65 Ok(Some(Frame::Frame(frame)))
66 }
67 FRAME_CONTENT_HEADER => {
68 let channel = src.get_u16();
69 let frame_len = src.get_u32() as usize;
70
71 let mut frame_buf = src.split_to(frame_len);
72 let frame = decode_content_header_frame(&mut frame_buf, channel);
73
74 let _frame_separator = src.get_u8();
75
76 Ok(Some(Frame::Frame(frame)))
77 }
78 FRAME_CONTENT_BODY => {
79 let channel = src.get_u16();
80 let body_len = src.get_u32();
81 let bytes = src.split_to(body_len as usize);
82
83 let _frame_separator = src.get_u8();
84
85 let frame = AMQPFrame::ContentBody(ContentBodyFrame {
87 channel,
88 body: bytes.to_vec(),
89 });
90
91 Ok(Some(Frame::Frame(frame)))
92 }
93 FRAME_HEARTBEAT => {
94 let channel = src.get_u16();
95 let len = src.get_u32();
96 let _ = src.split_to(len as usize);
97
98 let _frame_separator = src.get_u8();
99
100 Ok(Some(Frame::Frame(AMQPFrame::Heartbeat(channel))))
101 }
102 FRAME_AMQP_VERSION => {
103 let mut head = [0u8; 7];
104 src.copy_to_slice(&mut head);
105
106 Ok(Some(Frame::Frame(AMQPFrame::Header)))
109 }
110 f => Err(std::io::Error::new(
111 std::io::ErrorKind::Other,
112 format!("Unknown frame {}", f),
113 )),
114 }
115 }
116}
117
118fn is_full_frame(src: &BytesMut) -> bool {
121 match src[0] {
122 FRAME_AMQP_VERSION => src.len() >= 8,
123 _ => {
124 let mut bs = [0u8; 4];
125 bs.copy_from_slice(&src[3..7]);
126
127 let len = u32::from_be_bytes(bs) as usize;
128
129 src.len() >= len + 8
130 }
131 }
132}
133
134fn decode_method_frame(src: &mut BytesMut, channel: u16) -> AMQPFrame {
136 let class_method = src.get_u32();
137
138 let method_frame_args = match class_method {
139 CONNECTION_START => decode_connection_start(src),
140 CONNECTION_START_OK => decode_connection_start_ok(src),
141 CONNECTION_TUNE => decode_connection_tune(src),
142 CONNECTION_TUNE_OK => decode_connection_tune_ok(src),
143 CONNECTION_OPEN => decode_connection_open(src),
144 CONNECTION_OPEN_OK => decode_connection_open_ok(src),
145 CONNECTION_CLOSE => decode_connection_close(src),
146 CONNECTION_CLOSE_OK => MethodFrameArgs::ConnectionCloseOk,
147 CHANNEL_OPEN => decode_channel_open(src),
148 CHANNEL_OPEN_OK => decode_channel_open_ok(src),
149 CHANNEL_CLOSE => decode_channel_close(src),
150 CHANNEL_CLOSE_OK => MethodFrameArgs::ChannelCloseOk,
151 EXCHANGE_DECLARE => decode_exchange_declare(src),
152 EXCHANGE_DECLARE_OK => MethodFrameArgs::ExchangeDeclareOk,
153 EXCHANGE_DELETE => decode_exchange_delete(src),
154 EXCHANGE_DELETE_OK => MethodFrameArgs::ExchangeDeleteOk,
155 QUEUE_DECLARE => decode_queue_declare(src),
156 QUEUE_DECLARE_OK => decode_queue_declare_ok(src),
157 QUEUE_BIND => decode_queue_bind(src),
158 QUEUE_BIND_OK => MethodFrameArgs::QueueBindOk,
159 QUEUE_PURGE => decode_queue_purge(src),
160 QUEUE_PURGE_OK => decode_queue_purge_ok(src),
161 QUEUE_DELETE => decode_queue_delete(src),
162 QUEUE_DELETE_OK => decode_queue_delete_ok(src),
163 QUEUE_UNBIND => decode_queue_unbind(src),
164 QUEUE_UNBIND_OK => MethodFrameArgs::QueueUnbindOk,
165 BASIC_CONSUME => decode_basic_consume(src),
166 BASIC_CONSUME_OK => decode_basic_consume_ok(src),
167 BASIC_CANCEL => decode_basic_cancel(src),
168 BASIC_CANCEL_OK => decode_basic_cancel_ok(src),
169 BASIC_GET => decode_basic_get(src),
170 BASIC_GET_OK => decode_basic_get_ok(src),
171 BASIC_GET_EMPTY => decode_basic_get_empty(src),
172 BASIC_PUBLISH => decode_basic_publish(src),
173 BASIC_RETURN => decode_basic_return(src),
174 BASIC_DELIVER => decode_basic_deliver(src),
175 BASIC_ACK => decode_basic_ack(src),
176 BASIC_REJECT => decode_basic_reject(src),
177 CONFIRM_SELECT => decode_confirm_select(src),
178 CONFIRM_SELECT_OK => MethodFrameArgs::ConfirmSelectOk,
179 _ => unimplemented!("{:08X}", class_method),
180 };
181
182 AMQPFrame::Method(channel, class_method, method_frame_args)
183}
184
185fn decode_connection_start(src: &mut BytesMut) -> MethodFrameArgs {
186 let args = ConnectionStartArgs {
187 version_major: src.get_u8(),
188 version_minor: src.get_u8(),
189 properties: decode_field_table(src),
190 mechanisms: decode_long_string(src),
191 locales: decode_long_string(src),
192 ..Default::default()
193 };
194
195 MethodFrameArgs::ConnectionStart(args)
202}
203
204fn decode_connection_start_ok(src: &mut BytesMut) -> MethodFrameArgs {
205 let args = ConnectionStartOkArgs {
206 properties: decode_field_table(src),
207 mechanism: decode_short_string(src),
208 response: decode_long_string(src),
209 locale: decode_short_string(src),
210 ..Default::default()
211 };
212
213 MethodFrameArgs::ConnectionStartOk(args)
216}
217
218fn decode_connection_tune(src: &mut BytesMut) -> MethodFrameArgs {
219 let args = ConnectionTuneArgs {
220 channel_max: src.get_u16(),
221 frame_max: src.get_u32(),
222 heartbeat: src.get_u16(),
223 };
224
225 MethodFrameArgs::ConnectionTune(args)
226}
227
228fn decode_connection_tune_ok(src: &mut BytesMut) -> MethodFrameArgs {
229 let args = ConnectionTuneOkArgs {
230 channel_max: src.get_u16(),
231 frame_max: src.get_u32(),
232 heartbeat: src.get_u16(),
233 };
234
235 MethodFrameArgs::ConnectionTuneOk(args)
236}
237
238fn decode_connection_open(src: &mut BytesMut) -> MethodFrameArgs {
239 let virtual_host = decode_short_string(src);
240 let _reserved = decode_short_string(src);
241 let flags = src.get_u8();
242
243 MethodFrameArgs::ConnectionOpen(ConnectionOpenArgs {
244 virtual_host,
245 insist: flags & 0x01 != 0,
246 })
247}
248
249fn decode_connection_open_ok(src: &mut BytesMut) -> MethodFrameArgs {
250 let _ = decode_short_string(src);
251
252 MethodFrameArgs::ConnectionOpenOk
253}
254
255fn decode_connection_close(src: &mut BytesMut) -> MethodFrameArgs {
256 let args = ConnectionCloseArgs {
257 code: src.get_u16(),
258 text: decode_short_string(src),
259 class_id: src.get_u16(),
260 method_id: src.get_u16(),
261 };
262
263 MethodFrameArgs::ConnectionClose(args)
264}
265
266fn decode_channel_open(src: &mut BytesMut) -> MethodFrameArgs {
267 let _ = decode_short_string(src);
268
269 MethodFrameArgs::ChannelOpen
270}
271
272fn decode_channel_open_ok(src: &mut BytesMut) -> MethodFrameArgs {
273 let _ = decode_long_string(src);
274
275 MethodFrameArgs::ChannelOpenOk
276}
277
278fn decode_channel_close(src: &mut BytesMut) -> MethodFrameArgs {
279 let args = ChannelCloseArgs {
280 code: src.get_u16(),
281 text: decode_short_string(src),
282 class_id: src.get_u16(),
283 method_id: src.get_u16(),
284 };
285
286 MethodFrameArgs::ChannelClose(args)
287}
288
289fn decode_exchange_declare(src: &mut BytesMut) -> MethodFrameArgs {
290 let mut args = ExchangeDeclareArgs::default();
291 let _ = src.get_u16();
292 args.exchange_name = decode_short_string(src);
293 args.exchange_type = decode_short_string(src);
294 args.flags = ExchangeDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
295 args.args = decode_field_table(src);
296
297 MethodFrameArgs::ExchangeDeclare(args)
298}
299
300fn decode_exchange_delete(src: &mut BytesMut) -> MethodFrameArgs {
301 let mut args = ExchangeDeleteArgs::default();
302 let _ = src.get_u16();
303 args.exchange_name = decode_short_string(src);
304 args.flags = ExchangeDeleteFlags::from_bits(src.get_u8()).unwrap_or_default();
305
306 MethodFrameArgs::ExchangeDelete(args)
307}
308
309fn decode_queue_declare(src: &mut BytesMut) -> MethodFrameArgs {
310 let mut args = QueueDeclareArgs::default();
311 let _ = src.get_u16();
312 args.name = decode_short_string(src);
313 args.flags = QueueDeclareFlags::from_bits(src.get_u8()).unwrap_or_default();
314 args.args = decode_field_table(src);
315
316 MethodFrameArgs::QueueDeclare(args)
317}
318
319fn decode_queue_declare_ok(src: &mut BytesMut) -> MethodFrameArgs {
320 let args = QueueDeclareOkArgs {
321 name: decode_short_string(src),
322 message_count: src.get_u32(),
323 consumer_count: src.get_u32(),
324 };
325
326 MethodFrameArgs::QueueDeclareOk(args)
327}
328
329fn decode_queue_bind(src: &mut BytesMut) -> MethodFrameArgs {
330 let mut args = QueueBindArgs::default();
331 let _ = src.get_u16();
332 args.queue_name = decode_short_string(src);
333 args.exchange_name = decode_short_string(src);
334 args.routing_key = decode_short_string(src);
335
336 args.no_wait = src.get_u8() != 0;
337 args.args = decode_field_table(src);
338
339 MethodFrameArgs::QueueBind(args)
340}
341
342fn decode_queue_purge(src: &mut BytesMut) -> MethodFrameArgs {
343 let mut args = QueuePurgeArgs::default();
344 let _ = src.get_u16();
345 args.queue_name = decode_short_string(src);
346 args.no_wait = src.get_u8() != 0;
347
348 MethodFrameArgs::QueuePurge(args)
349}
350
351fn decode_queue_purge_ok(src: &mut BytesMut) -> MethodFrameArgs {
352 let mut args = QueuePurgeOkArgs::default();
353 args.message_count = src.get_u32();
354
355 MethodFrameArgs::QueuePurgeOk(args)
356}
357
358fn decode_queue_delete(src: &mut BytesMut) -> MethodFrameArgs {
359 let mut args = QueueDeleteArgs::default();
360 let _ = src.get_u16();
361 args.queue_name = decode_short_string(src);
362 args.flags = QueueDeleteFlags::from_bits(src.get_u8()).unwrap_or_default();
363
364 MethodFrameArgs::QueueDelete(args)
365}
366
367fn decode_queue_delete_ok(src: &mut BytesMut) -> MethodFrameArgs {
368 let args = QueueDeleteOkArgs {
369 message_count: src.get_u32(),
370 };
371
372 MethodFrameArgs::QueueDeleteOk(args)
373}
374
375fn decode_queue_unbind(src: &mut BytesMut) -> MethodFrameArgs {
376 let mut args = QueueUnbindArgs::default();
377 let _ = src.get_u16();
378 args.queue_name = decode_short_string(src);
379 args.exchange_name = decode_short_string(src);
380 args.routing_key = decode_short_string(src);
381 args.args = decode_field_table(src);
382
383 MethodFrameArgs::QueueUnbind(args)
384}
385
386fn decode_basic_consume(src: &mut BytesMut) -> MethodFrameArgs {
387 let mut args = BasicConsumeArgs::default();
388 let _ = src.get_u16();
389 args.queue = decode_short_string(src);
390 args.consumer_tag = decode_short_string(src);
391 args.flags = BasicConsumeFlags::from_bits(src.get_u8()).unwrap_or_default();
392 args.args = decode_field_table(src);
393
394 MethodFrameArgs::BasicConsume(args)
395}
396
397fn decode_basic_consume_ok(src: &mut BytesMut) -> MethodFrameArgs {
398 let args = BasicConsumeOkArgs {
399 consumer_tag: decode_short_string(src),
400 };
401
402 MethodFrameArgs::BasicConsumeOk(args)
403}
404
405fn decode_basic_cancel(src: &mut BytesMut) -> MethodFrameArgs {
406 let args = BasicCancelArgs {
407 consumer_tag: decode_short_string(src),
408 no_wait: src.get_u8() != 0,
409 };
410
411 MethodFrameArgs::BasicCancel(args)
412}
413
414fn decode_basic_cancel_ok(src: &mut BytesMut) -> MethodFrameArgs {
415 let args = BasicCancelOkArgs {
416 consumer_tag: decode_short_string(src),
417 };
418
419 MethodFrameArgs::BasicCancelOk(args)
420}
421
422fn decode_basic_get(src: &mut BytesMut) -> MethodFrameArgs {
423 let mut args = BasicGetArgs::default();
424 let _ = src.get_u16();
425
426 args.queue = decode_short_string(src);
427 args.no_ack = src.get_u8() != 0;
428
429 MethodFrameArgs::BasicGet(args)
430}
431
432fn decode_basic_get_ok(src: &mut BytesMut) -> MethodFrameArgs {
433 let mut args = BasicGetOkArgs::default();
434 args.delivery_tag = src.get_u64();
435 args.redelivered = src.get_u8() != 0;
436 args.exchange_name = decode_short_string(src);
437 args.routing_key = decode_short_string(src);
438 args.message_count = src.get_u32();
439
440 MethodFrameArgs::BasicGetOk(args)
441}
442
443fn decode_basic_get_empty(src: &mut BytesMut) -> MethodFrameArgs {
444 let _ = src.get_u8();
445
446 MethodFrameArgs::BasicGetEmpty
447}
448
449fn decode_basic_publish(src: &mut BytesMut) -> MethodFrameArgs {
450 let mut args = BasicPublishArgs::default();
451 let _ = src.get_u16();
452 args.exchange_name = decode_short_string(src);
453 args.routing_key = decode_short_string(src);
454 args.flags = BasicPublishFlags::from_bits(src.get_u8()).unwrap_or_default();
455
456 MethodFrameArgs::BasicPublish(args)
457}
458
459fn decode_basic_return(src: &mut BytesMut) -> MethodFrameArgs {
460 let args = BasicReturnArgs {
461 reply_code: src.get_u16(),
462 reply_text: decode_short_string(src),
463 exchange_name: decode_short_string(src),
464 routing_key: decode_short_string(src),
465 };
466
467 MethodFrameArgs::BasicReturn(args)
468}
469
470fn decode_basic_deliver(src: &mut BytesMut) -> MethodFrameArgs {
471 let args = BasicDeliverArgs {
472 consumer_tag: decode_short_string(src),
473 delivery_tag: src.get_u64(),
474 redelivered: src.get_u8() != 0,
475 exchange_name: decode_short_string(src),
476 routing_key: decode_short_string(src),
477 };
478
479 MethodFrameArgs::BasicDeliver(args)
480}
481
482fn decode_basic_ack(src: &mut BytesMut) -> MethodFrameArgs {
483 let args = BasicAckArgs {
484 delivery_tag: src.get_u64(),
485 multiple: src.get_u8() != 0,
486 };
487
488 MethodFrameArgs::BasicAck(args)
489}
490
491fn decode_basic_reject(src: &mut BytesMut) -> MethodFrameArgs {
492 let args = BasicRejectArgs {
493 delivery_tag: src.get_u64(),
494 requeue: src.get_u8() != 0,
495 };
496
497 MethodFrameArgs::BasicReject(args)
498}
499
500fn decode_confirm_select(src: &mut BytesMut) -> MethodFrameArgs {
501 let args = ConfirmSelectArgs {
502 no_wait: src.get_u8() != 0,
503 };
504
505 MethodFrameArgs::ConfirmSelect(args)
506}
507
508fn decode_content_header_frame(src: &mut BytesMut, channel: u16) -> AMQPFrame {
509 let class_id = src.get_u16();
510 let weight = src.get_u16();
511 let body_size = src.get_u64();
512 let property_flags = HeaderPropertyFlags::from_bits(src.get_u16()).unwrap_or_default();
513
514 let content_type = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CONTENT_TYPE);
515 let content_encoding = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CONTENT_ENCODING);
516 let headers = decode_field_table_flag(src, property_flags, HeaderPropertyFlags::HEADERS);
517 let delivery_mode = if property_flags.contains(HeaderPropertyFlags::DELIVERY_MODE) {
518 Some(src.get_u8())
519 } else {
520 None
521 };
522 let priority = if property_flags.contains(HeaderPropertyFlags::PRIORITY) {
523 Some(src.get_u8())
524 } else {
525 None
526 };
527 let correlation_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CORRELATION_ID);
528 let reply_to = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::REPLY_TO);
529 let expiration = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::EXPIRATION);
530 let message_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::MESSAGE_ID);
531 let timestamp = if property_flags.contains(HeaderPropertyFlags::TIMESTAMP) {
532 Some(src.get_u64())
533 } else {
534 None
535 };
536 let message_type = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::MESSAGE_TYPE);
537 let user_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::USER_ID);
538 let app_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::APP_ID);
539 let cluster_id = decode_short_string_flag(src, property_flags, HeaderPropertyFlags::CLUSTER_ID);
540
541 AMQPFrame::ContentHeader(ContentHeaderFrame {
542 channel,
543 class_id,
544 weight,
545 body_size,
546 prop_flags: property_flags,
547 cluster_id,
548 app_id,
549 user_id,
550 message_type,
551 timestamp,
552 message_id,
553 expiration,
554 reply_to,
555 correlation_id,
556 priority,
557 delivery_mode,
558 headers,
559 content_encoding,
560 content_type,
561 })
562}
563
564fn decode_short_string_flag(
565 src: &mut BytesMut,
566 flags: HeaderPropertyFlags,
567 flag: HeaderPropertyFlags,
568) -> Option<String> {
569 if flags.contains(flag) {
570 Some(decode_short_string(src))
571 } else {
572 None
573 }
574}
575
576fn decode_field_table_flag(
577 src: &mut BytesMut,
578 flags: HeaderPropertyFlags,
579 flag: HeaderPropertyFlags,
580) -> Option<FieldTable> {
581 if flags.contains(flag) {
582 decode_field_table(src)
583 } else {
584 None
585 }
586}
587
588fn decode_value(buf: &mut BytesMut) -> AMQPFieldValue {
589 match buf.get_u8() {
590 b't' => {
591 let bool_value = buf.get_u8() != 0;
592
593 AMQPFieldValue::Bool(bool_value)
594 }
595 b'S' => {
596 let string_value = decode_long_string(buf);
597
598 AMQPFieldValue::LongString(string_value)
599 }
600 b'F' => match decode_field_table(buf) {
601 None => AMQPFieldValue::EmptyFieldTable,
602 Some(table) => AMQPFieldValue::FieldTable(Box::new(table)),
603 },
604 t => panic!("Unknown type {}", t),
605 }
606}
607
608fn decode_short_string(buf: &mut BytesMut) -> String {
609 let len = buf.get_u8() as usize;
610 let sb = buf.split_to(len);
611
612 String::from_utf8(sb.to_vec()).unwrap()
613}
614
615fn decode_long_string(buf: &mut BytesMut) -> String {
616 let len = buf.get_u32() as usize;
617 let sb = buf.split_to(len);
618
619 String::from_utf8(sb.to_vec()).unwrap()
620}
621
622fn decode_field_table(buf: &mut BytesMut) -> Option<HashMap<String, AMQPFieldValue>> {
627 let ft_len = buf.get_u32() as usize;
628
629 if ft_len == 0 {
630 return None;
631 }
632
633 let mut ft_buf = buf.split_to(ft_len);
634 let mut table = HashMap::new();
635
636 while ft_buf.has_remaining() {
637 let field_name = decode_short_string(&mut ft_buf);
638 let field_value = decode_value(&mut ft_buf);
639
640 table.insert(field_name, field_value);
641 }
642
643 Some(table)
644}
645
646fn encode_amqp_frame(buf: &mut BytesMut, frame: AMQPFrame) {
647 match frame {
648 AMQPFrame::Header => buf.put(&b"AMQP\x00\x00\x09\x01"[..]),
649
650 AMQPFrame::Method(ch, cm, args) => encode_method_frame(buf, ch, cm, &args),
651
652 AMQPFrame::ContentHeader(header_frame) => encode_content_header_frame(buf, &header_frame),
653
654 AMQPFrame::ContentBody(body_frame) => encode_content_body_frame(buf, &body_frame),
655
656 AMQPFrame::Heartbeat(channel) => encode_heartbeat_frame(buf, channel),
657 }
658}
659
660fn encode_method_frame(buf: &mut BytesMut, channel: Channel, cm: ClassMethod, args: &MethodFrameArgs) {
661 buf.put_u8(1u8);
662 buf.put_u16(channel);
663
664 let mut fr = BytesMut::with_capacity(4096);
665 fr.put_u32(cm);
666
667 match args {
668 MethodFrameArgs::ConnectionStart(args) => encode_connection_start(&mut fr, args),
669 MethodFrameArgs::ConnectionStartOk(args) => encode_connection_start_ok(&mut fr, args),
670 MethodFrameArgs::ConnectionTune(args) => encode_connection_tune(&mut fr, args),
671 MethodFrameArgs::ConnectionTuneOk(args) => encode_connection_tune_ok(&mut fr, args),
672 MethodFrameArgs::ConnectionOpen(args) => encode_connection_open(&mut fr, args),
673 MethodFrameArgs::ConnectionOpenOk => encode_connection_open_ok(&mut fr),
674 MethodFrameArgs::ConnectionClose(args) => encode_connection_close(&mut fr, args),
675 MethodFrameArgs::ConnectionCloseOk => (),
676 MethodFrameArgs::ChannelOpen => encode_channel_open(&mut fr),
677 MethodFrameArgs::ChannelOpenOk => encode_channel_open_ok(&mut fr),
678 MethodFrameArgs::ChannelClose(args) => encode_channel_close(&mut fr, args),
679 MethodFrameArgs::ChannelCloseOk => (),
680 MethodFrameArgs::ExchangeDeclare(args) => encode_exchange_declare(&mut fr, args),
681 MethodFrameArgs::ExchangeDeclareOk => (),
682 MethodFrameArgs::ExchangeDelete(args) => encode_exchange_delete(&mut fr, args),
683 MethodFrameArgs::ExchangeDeleteOk => (),
684 MethodFrameArgs::QueueDeclare(args) => encode_queue_declare(&mut fr, args),
685 MethodFrameArgs::QueueDeclareOk(args) => encode_queue_declare_ok(&mut fr, args),
686 MethodFrameArgs::QueueBind(args) => encode_queue_bind(&mut fr, args),
687 MethodFrameArgs::QueueBindOk => (),
688 MethodFrameArgs::QueuePurge(args) => encode_queue_purge(&mut fr, args),
689 MethodFrameArgs::QueuePurgeOk(args) => encode_queue_purge_ok(&mut fr, args),
690 MethodFrameArgs::QueueDelete(args) => encode_queue_delete(&mut fr, args),
691 MethodFrameArgs::QueueDeleteOk(args) => encode_queue_delete_ok(&mut fr, args),
692 MethodFrameArgs::QueueUnbind(args) => encode_queue_unbind(&mut fr, args),
693 MethodFrameArgs::QueueUnbindOk => (),
694 MethodFrameArgs::BasicConsume(args) => encode_basic_consume(&mut fr, args),
695 MethodFrameArgs::BasicConsumeOk(args) => encode_basic_consume_ok(&mut fr, args),
696 MethodFrameArgs::BasicCancel(args) => encode_basic_cancel(&mut fr, args),
697 MethodFrameArgs::BasicCancelOk(args) => encode_basic_cancel_ok(&mut fr, args),
698 MethodFrameArgs::BasicGet(args) => encode_basic_get(&mut fr, args),
699 MethodFrameArgs::BasicGetOk(args) => encode_basic_get_ok(&mut fr, args),
700 MethodFrameArgs::BasicGetEmpty => encode_basic_get_empty(&mut fr),
701 MethodFrameArgs::BasicPublish(args) => encode_basic_publish(&mut fr, args),
702 MethodFrameArgs::BasicReturn(args) => encode_basic_return(&mut fr, args),
703 MethodFrameArgs::BasicDeliver(args) => encode_basic_deliver(&mut fr, args),
704 MethodFrameArgs::BasicAck(args) => encode_basic_ack(&mut fr, args),
705 MethodFrameArgs::BasicReject(args) => encode_basic_reject(&mut fr, args),
706 MethodFrameArgs::ConfirmSelect(args) => encode_confirm_select(&mut fr, args),
707 MethodFrameArgs::ConfirmSelectOk => (),
708 }
709
710 buf.put_u32(fr.len() as u32);
711 buf.put(fr);
712 buf.put_u8(0xCE);
713}
714
715fn encode_connection_start(buf: &mut BytesMut, args: &ConnectionStartArgs) {
716 buf.put_u8(args.version_major);
717 buf.put_u8(args.version_minor);
718 encode_field_table(buf, args.properties.as_ref());
719 encode_long_string(buf, &args.mechanisms);
720 encode_long_string(buf, &args.locales);
721}
722
723fn encode_connection_start_ok(buf: &mut BytesMut, args: &ConnectionStartOkArgs) {
724 encode_field_table(buf, args.properties.as_ref());
725 encode_short_string(buf, &args.mechanism);
726 encode_long_string(buf, &args.response);
727 encode_short_string(buf, &args.locale);
728}
729
730fn encode_connection_tune(buf: &mut BytesMut, args: &ConnectionTuneArgs) {
731 buf.put_u16(args.channel_max);
732 buf.put_u32(args.frame_max);
733 buf.put_u16(args.heartbeat);
734}
735
736fn encode_connection_tune_ok(buf: &mut BytesMut, args: &ConnectionTuneOkArgs) {
737 buf.put_u16(args.channel_max);
738 buf.put_u32(args.frame_max);
739 buf.put_u16(args.heartbeat);
740}
741
742fn encode_connection_open(buf: &mut BytesMut, args: &ConnectionOpenArgs) {
743 encode_short_string(buf, &args.virtual_host);
744 encode_short_string(buf, "");
745 let mut flags = 0x00;
746
747 if args.insist {
748 flags |= 0x01;
749 }
750
751 buf.put_u8(flags);
752}
753
754fn encode_connection_open_ok(buf: &mut BytesMut) {
755 buf.put_u8(0);
757}
758
759fn encode_connection_close(buf: &mut BytesMut, args: &ConnectionCloseArgs) {
760 buf.put_u16(args.code);
761 encode_short_string(buf, &args.text);
762 buf.put_u16(args.class_id);
763 buf.put_u16(args.method_id);
764}
765
766fn encode_channel_open(buf: &mut BytesMut) {
767 buf.put_u8(0);
769}
770
771fn encode_channel_open_ok(buf: &mut BytesMut) {
772 buf.put_u32(0);
774}
775
776fn encode_channel_close(buf: &mut BytesMut, args: &ChannelCloseArgs) {
777 buf.put_u16(args.code);
778 encode_short_string(buf, &args.text);
779 buf.put_u16(args.class_id);
780 buf.put_u16(args.method_id);
781}
782
783fn encode_exchange_declare(buf: &mut BytesMut, args: &ExchangeDeclareArgs) {
784 buf.put_u16(0);
785 encode_short_string(buf, &args.exchange_name);
786 encode_short_string(buf, &args.exchange_type);
787 buf.put_u8(args.flags.bits());
788 encode_field_table(buf, args.args.as_ref());
789}
790
791fn encode_exchange_delete(buf: &mut BytesMut, args: &ExchangeDeleteArgs) {
792 buf.put_u16(0);
793 encode_short_string(buf, &args.exchange_name);
794 buf.put_u8(args.flags.bits());
795}
796
797fn encode_queue_declare(buf: &mut BytesMut, args: &QueueDeclareArgs) {
798 buf.put_u16(0);
799 encode_short_string(buf, &args.name);
800 buf.put_u8(args.flags.bits());
801 encode_field_table(buf, args.args.as_ref());
802}
803
804fn encode_queue_declare_ok(buf: &mut BytesMut, args: &QueueDeclareOkArgs) {
805 encode_short_string(buf, &args.name);
806 buf.put_u32(args.message_count);
807 buf.put_u32(args.consumer_count);
808}
809
810fn encode_queue_bind(buf: &mut BytesMut, args: &QueueBindArgs) {
811 buf.put_u16(0);
812 encode_short_string(buf, &args.queue_name);
813 encode_short_string(buf, &args.exchange_name);
814 encode_short_string(buf, &args.routing_key);
815 buf.put_u8(if args.no_wait { 1 } else { 0 });
816 encode_field_table(buf, args.args.as_ref());
817}
818
819fn encode_queue_purge(buf: &mut BytesMut, args: &QueuePurgeArgs) {
820 buf.put_u16(0);
821 encode_short_string(buf, &args.queue_name);
822 buf.put_u8(if args.no_wait { 1 } else { 0 });
823}
824
825fn encode_queue_purge_ok(buf: &mut BytesMut, args: &QueuePurgeOkArgs) {
826 buf.put_u32(args.message_count);
827}
828
829fn encode_queue_delete(buf: &mut BytesMut, args: &QueueDeleteArgs) {
830 buf.put_u16(0);
831 encode_short_string(buf, &args.queue_name);
832 buf.put_u8(args.flags.bits());
833}
834
835fn encode_queue_delete_ok(buf: &mut BytesMut, args: &QueueDeleteOkArgs) {
836 buf.put_u32(args.message_count);
837}
838
839fn encode_queue_unbind(buf: &mut BytesMut, args: &QueueUnbindArgs) {
840 buf.put_u16(0);
841 encode_short_string(buf, &args.queue_name);
842 encode_short_string(buf, &args.exchange_name);
843 encode_short_string(buf, &args.routing_key);
844 encode_field_table(buf, args.args.as_ref());
845}
846
847fn encode_basic_consume(buf: &mut BytesMut, args: &BasicConsumeArgs) {
848 buf.put_u16(0);
849 encode_short_string(buf, &args.queue);
850 encode_short_string(buf, &args.consumer_tag);
851 buf.put_u8(args.flags.bits());
852 encode_field_table(buf, args.args.as_ref());
853}
854
855fn encode_basic_consume_ok(buf: &mut BytesMut, args: &BasicConsumeOkArgs) {
856 encode_short_string(buf, &args.consumer_tag);
857}
858
859fn encode_basic_cancel(buf: &mut BytesMut, args: &BasicCancelArgs) {
860 encode_short_string(buf, &args.consumer_tag);
861 buf.put_u8(if args.no_wait { 1 } else { 0 });
862}
863
864fn encode_basic_cancel_ok(buf: &mut BytesMut, args: &BasicCancelOkArgs) {
865 encode_short_string(buf, &args.consumer_tag);
866}
867
868fn encode_basic_get(buf: &mut BytesMut, args: &BasicGetArgs) {
869 buf.put_u16(0);
870 encode_short_string(buf, &args.queue);
871 buf.put_u8(if args.no_ack { 1 } else { 0 });
872}
873
874fn encode_basic_get_ok(buf: &mut BytesMut, args: &BasicGetOkArgs) {
875 buf.put_u64(args.delivery_tag);
876 buf.put_u8(if args.redelivered { 1 } else { 0 });
877 encode_short_string(buf, &args.exchange_name);
878 encode_short_string(buf, &args.routing_key);
879 buf.put_u32(args.message_count);
880}
881
882fn encode_basic_get_empty(buf: &mut BytesMut) {
883 buf.put_u8(0);
884}
885
886fn encode_basic_publish(buf: &mut BytesMut, args: &BasicPublishArgs) {
887 buf.put_u16(0);
888 encode_short_string(buf, &args.exchange_name);
889 encode_short_string(buf, &args.routing_key);
890 buf.put_u8(args.flags.bits());
891}
892
893fn encode_basic_return(buf: &mut BytesMut, args: &BasicReturnArgs) {
894 buf.put_u16(args.reply_code);
895 encode_short_string(buf, &args.reply_text);
896 encode_short_string(buf, &args.exchange_name);
897 encode_short_string(buf, &args.routing_key);
898}
899
900fn encode_basic_deliver(buf: &mut BytesMut, args: &BasicDeliverArgs) {
901 encode_short_string(buf, &args.consumer_tag);
902 buf.put_u64(args.delivery_tag);
903 buf.put_u8(if args.redelivered { 1 } else { 0 });
904 encode_short_string(buf, &args.exchange_name);
905 encode_short_string(buf, &args.routing_key);
906}
907
908fn encode_basic_ack(buf: &mut BytesMut, args: &BasicAckArgs) {
909 buf.put_u64(args.delivery_tag);
910 buf.put_u8(if args.multiple { 1 } else { 0 });
911}
912
913fn encode_basic_reject(buf: &mut BytesMut, args: &BasicRejectArgs) {
914 buf.put_u64(args.delivery_tag);
915 buf.put_u8(if args.requeue { 1 } else { 0 });
916}
917
918fn encode_confirm_select(buf: &mut BytesMut, args: &ConfirmSelectArgs) {
919 buf.put_u8(if args.no_wait { 1 } else { 0 });
920}
921
922fn encode_content_header_frame(buf: &mut BytesMut, hf: &ContentHeaderFrame) {
923 buf.put_u8(2u8);
924 buf.put_u16(hf.channel);
925
926 let mut fr_buf = BytesMut::with_capacity(4096);
927 fr_buf.put_u16(hf.class_id);
928 fr_buf.put_u16(hf.weight);
929 fr_buf.put_u64(hf.body_size);
930 fr_buf.put_u16(hf.prop_flags.bits());
931
932 if let Some(s) = hf.content_type.as_ref() {
933 encode_short_string(&mut fr_buf, s);
934 }
935 if let Some(s) = hf.content_encoding.as_ref() {
936 encode_short_string(&mut fr_buf, s);
937 }
938 if hf.prop_flags.contains(HeaderPropertyFlags::HEADERS) {
939 encode_field_table(&mut fr_buf, hf.headers.as_ref());
940 }
941 if let Some(v) = hf.delivery_mode {
942 fr_buf.put_u8(v);
943 }
944 if let Some(v) = hf.priority {
945 fr_buf.put_u8(v);
946 }
947 if let Some(s) = hf.correlation_id.as_ref() {
948 encode_short_string(&mut fr_buf, s);
949 }
950 if let Some(s) = hf.reply_to.as_ref() {
951 encode_short_string(&mut fr_buf, s);
952 }
953 if let Some(s) = hf.expiration.as_ref() {
954 encode_short_string(&mut fr_buf, s);
955 }
956 if let Some(s) = hf.message_id.as_ref() {
957 encode_short_string(&mut fr_buf, s);
958 }
959 if let Some(v) = hf.timestamp {
960 fr_buf.put_u64(v);
961 }
962 if let Some(s) = hf.message_type.as_ref() {
963 encode_short_string(&mut fr_buf, s);
964 }
965 if let Some(s) = hf.user_id.as_ref() {
966 encode_short_string(&mut fr_buf, s);
967 }
968 if let Some(s) = hf.app_id.as_ref() {
969 encode_short_string(&mut fr_buf, s);
970 }
971 if let Some(s) = hf.cluster_id.as_ref() {
972 encode_short_string(&mut fr_buf, s);
973 }
974
975 buf.put_u32(fr_buf.len() as u32);
976 buf.put(fr_buf);
977 buf.put_u8(0xCE);
978}
979
980fn encode_content_body_frame(buf: &mut BytesMut, bf: &ContentBodyFrame) {
981 buf.put_u8(3u8);
983 buf.put_u16(bf.channel);
984
985 let mut fr_buf = BytesMut::with_capacity(bf.body.len());
986 fr_buf.put(bf.body.as_slice());
987
988 buf.put_u32(fr_buf.len() as u32);
989 buf.put(fr_buf);
990 buf.put_u8(0xCE);
991}
992
993fn encode_heartbeat_frame(buf: &mut BytesMut, channel: Channel) {
994 buf.put_u16(channel);
995 buf.put_u32(0);
996 buf.put_u8(0xCE);
997}
998
999fn encode_short_string(buf: &mut BytesMut, s: &str) {
1000 buf.put_u8(s.len() as u8);
1002 buf.put(s.as_bytes());
1003}
1004
1005fn encode_long_string(buf: &mut BytesMut, s: &str) {
1006 buf.put_u32(s.len() as u32);
1007 buf.put(s.as_bytes());
1008}
1009
1010fn encode_empty_field_table(buf: &mut BytesMut) {
1011 buf.put_u32(0);
1012}
1013
1014fn encode_field_table(buf: &mut BytesMut, ft: Option<&HashMap<String, AMQPFieldValue>>) {
1015 match ft {
1016 None => buf.put_u32(0),
1017 Some(t) => encode_field_table2(buf, t),
1018 }
1019}
1020
1021fn encode_field_table2(buf: &mut BytesMut, ft: &HashMap<String, AMQPFieldValue>) {
1022 let mut ft_buf = BytesMut::with_capacity(4096);
1023
1024 for (name, value) in ft {
1025 encode_short_string(&mut ft_buf, name);
1026
1027 match value {
1028 AMQPFieldValue::Bool(v) => {
1029 ft_buf.put_u8(b't');
1030 ft_buf.put_u8(if *v { 1 } else { 0 });
1031 }
1032 AMQPFieldValue::LongString(v) => {
1033 ft_buf.put_u8(b'S');
1034 ft_buf.put_u32(v.len() as u32);
1035 ft_buf.put(v.as_bytes());
1036 }
1037 AMQPFieldValue::EmptyFieldTable => encode_empty_field_table(&mut ft_buf),
1038 AMQPFieldValue::FieldTable(v) => {
1039 ft_buf.put_u8(b'F');
1040
1041 encode_field_table2(&mut ft_buf, v);
1043 }
1044 }
1045 }
1046
1047 buf.put_u32(ft_buf.len() as u32);
1048 buf.put(ft_buf);
1049}