amqpr_codec/frame/content_header/
decoder.rs

1use bytes::{BytesMut, BigEndian, Buf};
2
3use std::io::Cursor;
4use std::collections::HashMap;
5
6use super::{ContentHeaderPayload, Properties};
7use args::{AmqpString, FieldArgument};
8
9const NUM_OF_PROPERTY: usize = 13;
10
11pub fn decode_payload(payload: &mut BytesMut) -> ContentHeaderPayload {
12    let properties = decode_properties(payload.split_off(12));
13
14    let mut others_cursor = Cursor::new(payload.take());
15
16    let class_id = others_cursor.get_u16::<BigEndian>();
17    assert!(class_id != 0);
18
19    let weight = others_cursor.get_u16::<BigEndian>(); // must be zero
20    assert_eq!(weight, 0);
21
22    let body_size = others_cursor.get_u64::<BigEndian>();
23    drop(others_cursor);
24
25    let payload = ContentHeaderPayload {
26        class_id: class_id,
27        body_size: body_size,
28        properties: properties,
29    };
30
31    payload
32}
33
34
35
36fn decode_properties(mut bytes: BytesMut) -> Properties {
37    let mut flags = Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>();
38
39    let mut ps = Properties::new();
40
41    for i in 0..NUM_OF_PROPERTY {
42        if flags == 0 {
43            break;
44        }
45        if check_flag_n(&flags, i) {
46            remove_flag_n(&mut flags, i);
47            set_property_n(&mut ps, i, &mut bytes);
48        }
49    }
50
51    ps
52}
53
54
55fn check_flag_n(flags: &u16, i: usize) -> bool {
56    flags & (1u16 << (15 - i)) != 0
57}
58
59
60fn remove_flag_n(flags: &mut u16, i: usize) {
61    *flags -= 1u16 << (15 - i);
62}
63
64fn set_property_n(ps: &mut Properties, i: usize, bytes: &mut BytesMut) {
65    match i {
66        0 => ps.content_type = Some(decode_short_str(bytes)),
67        1 => ps.content_encoding = Some(decode_short_str(bytes)),
68        2 => ps.headers = Some(decode_field_table(bytes)),
69        3 => ps.delivery_mode = Some(decode_u8(bytes)),
70        4 => ps.priority = Some(decode_u8(bytes)),
71        5 => ps.correlation_id = Some(decode_short_str(bytes)),
72        6 => ps.reply_to = Some(decode_short_str(bytes)),
73        7 => ps.expiration = Some(decode_short_str(bytes)),
74        8 => ps.message_id = Some(decode_short_str(bytes)),
75        9 => ps.timestamp = Some(decode_i64(bytes)),
76        10 => ps.type_ = Some(decode_short_str(bytes)),
77        11 => ps.user_id = Some(decode_short_str(bytes)),
78        12 => ps.app_id = Some(decode_short_str(bytes)),
79        _ => unreachable!(),
80    }
81}
82
83
84fn decode_short_str(bytes: &mut BytesMut) -> AmqpString {
85    let len = Cursor::new(bytes.split_to(1)).get_u8();
86    AmqpString(bytes.split_to(len as usize).freeze())
87}
88
89
90fn decode_u8(bytes: &mut BytesMut) -> u8 {
91    Cursor::new(bytes.split_to(1)).get_u8()
92}
93
94
95fn decode_i64(bytes: &mut BytesMut) -> i64 {
96    Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>()
97}
98
99
100fn decode_field_table(bytes: &mut BytesMut) -> HashMap<AmqpString, FieldArgument> {
101    ::frame::method::decoder::decode_field_table(bytes)
102}