amqpr_codec/frame/content_header/
decoder.rs1use bytes::{BytesMut, BigEndian, Buf};
2
3use std::io::Cursor;
4use std::collections::HashMap;
5
6use super::{ContentHeaderPayload, Properties};
7use args::{AmqpString, FieldArgument};
8
9const NUM_OF_PROPERTY: usize = 13;
10
11pub fn decode_payload(payload: &mut BytesMut) -> ContentHeaderPayload {
12 let properties = decode_properties(payload.split_off(12));
13
14 let mut others_cursor = Cursor::new(payload.take());
15
16 let class_id = others_cursor.get_u16::<BigEndian>();
17 assert!(class_id != 0);
18
19 let weight = others_cursor.get_u16::<BigEndian>(); assert_eq!(weight, 0);
21
22 let body_size = others_cursor.get_u64::<BigEndian>();
23 drop(others_cursor);
24
25 let payload = ContentHeaderPayload {
26 class_id: class_id,
27 body_size: body_size,
28 properties: properties,
29 };
30
31 payload
32}
33
34
35
36fn decode_properties(mut bytes: BytesMut) -> Properties {
37 let mut flags = Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>();
38
39 let mut ps = Properties::new();
40
41 for i in 0..NUM_OF_PROPERTY {
42 if flags == 0 {
43 break;
44 }
45 if check_flag_n(&flags, i) {
46 remove_flag_n(&mut flags, i);
47 set_property_n(&mut ps, i, &mut bytes);
48 }
49 }
50
51 ps
52}
53
54
55fn check_flag_n(flags: &u16, i: usize) -> bool {
56 flags & (1u16 << (15 - i)) != 0
57}
58
59
60fn remove_flag_n(flags: &mut u16, i: usize) {
61 *flags -= 1u16 << (15 - i);
62}
63
64fn set_property_n(ps: &mut Properties, i: usize, bytes: &mut BytesMut) {
65 match i {
66 0 => ps.content_type = Some(decode_short_str(bytes)),
67 1 => ps.content_encoding = Some(decode_short_str(bytes)),
68 2 => ps.headers = Some(decode_field_table(bytes)),
69 3 => ps.delivery_mode = Some(decode_u8(bytes)),
70 4 => ps.priority = Some(decode_u8(bytes)),
71 5 => ps.correlation_id = Some(decode_short_str(bytes)),
72 6 => ps.reply_to = Some(decode_short_str(bytes)),
73 7 => ps.expiration = Some(decode_short_str(bytes)),
74 8 => ps.message_id = Some(decode_short_str(bytes)),
75 9 => ps.timestamp = Some(decode_i64(bytes)),
76 10 => ps.type_ = Some(decode_short_str(bytes)),
77 11 => ps.user_id = Some(decode_short_str(bytes)),
78 12 => ps.app_id = Some(decode_short_str(bytes)),
79 _ => unreachable!(),
80 }
81}
82
83
84fn decode_short_str(bytes: &mut BytesMut) -> AmqpString {
85 let len = Cursor::new(bytes.split_to(1)).get_u8();
86 AmqpString(bytes.split_to(len as usize).freeze())
87}
88
89
90fn decode_u8(bytes: &mut BytesMut) -> u8 {
91 Cursor::new(bytes.split_to(1)).get_u8()
92}
93
94
95fn decode_i64(bytes: &mut BytesMut) -> i64 {
96 Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>()
97}
98
99
100fn decode_field_table(bytes: &mut BytesMut) -> HashMap<AmqpString, FieldArgument> {
101 ::frame::method::decoder::decode_field_table(bytes)
102}