amqpr_codec/frame/method/
decoder.rs

1use bytes::{BytesMut, BigEndian, Buf};
2
3use std::io::Cursor;
4use std::collections::HashMap;
5
6use frame::method::{MethodPayload, ConnectionClass, ChannelClass, ExchangeClass, QueueClass,
7                    BasicClass, TxClass};
8use args::*;
9
10/// # NOTICE
11/// This method does not check payload length.
12pub fn decode_payload(bytes: &mut BytesMut) -> MethodPayload {
13    let mut cursor = Cursor::new(bytes.split_off(4).freeze());
14
15    let class_id = cursor.get_u16::<BigEndian>();
16    debug!("class_id is {}", class_id);
17
18    let method_id = cursor.get_u16::<BigEndian>();
19    debug!("method_id is {}", method_id);
20
21    drop(cursor);
22
23    match class_id {
24        10 => MethodPayload::Connection(decode_connection_class(method_id, bytes)),
25        20 => MethodPayload::Channel(decode_channel_class(method_id, bytes)),
26        40 => MethodPayload::Exchange(decode_exchange_class(method_id, bytes)),
27        50 => MethodPayload::Queue(decode_queue_class(method_id, bytes)),
28        60 => MethodPayload::Basic(decode_basic_class(method_id, bytes)),
29        90 => MethodPayload::Tx(decode_tx_class(method_id, bytes)),
30        c => unreachable!("Unexpected class id {}", c),
31    }
32}
33
34
35// Decode Connection Class {{{
36fn decode_connection_class(method_id: u16, bytes: &mut BytesMut) -> ConnectionClass {
37    use frame::method::connection::*;
38    use self::ConnectionClass::*;
39    match method_id {
40        10 => Start(StartMethod {
41            version_major: decode_octet(bytes),
42            version_minor: decode_octet(bytes),
43            server_properties: decode_field_table(bytes),
44            mechanisms: decode_long_str(bytes),
45            locales: decode_long_str(bytes),
46        }),
47        20 => Secure(SecureMethod { challenge: decode_long_str(bytes) }),
48        30 => Tune(TuneMethod {
49            channel_max: decode_short(bytes),
50            frame_max: decode_long(bytes),
51            heartbeat: decode_short(bytes),
52        }),
53        41 => OpenOk(OpenOkMethod { reserved1: decode_short_str(bytes) }),
54        50 => Close(CloseMethod {
55            reply_code: decode_short(bytes),
56            reply_text: decode_short_str(bytes),
57            class_id: decode_short(bytes),
58            method_id: decode_short(bytes),
59        }),
60        51 => CloseOk,
61        60 => Blocked(BlockedMethod { reason: decode_short_str(bytes) }),
62        61 => Unblocked,
63        m => unreachable!("Unexpected method id {} in Connection class", m),
64    }
65}
66// }}}
67
68
69// Decode Channel Class {{{
70fn decode_channel_class(method_id: u16, bytes: &mut BytesMut) -> ChannelClass {
71    use frame::method::channel::*;
72    use self::ChannelClass::*;
73    match method_id {
74        11 => OpenOk(OpenOkMethod { reserved1: decode_long_str(bytes) }),
75        20 => Flow(FlowMethod { active: decode_bool_1(bytes) }),
76        21 => FlowOk(FlowOkMethod { active: decode_bool_1(bytes) }),
77        40 => Close(CloseMethod {
78            reply_code: decode_short(bytes),
79            reply_text: decode_short_str(bytes),
80            class_id: decode_short(bytes),
81            method_id: decode_short(bytes),
82        }),
83        41 => CloseOk,
84        m => unreachable!("Unexpected method id {} in Channel class", m),
85    }
86}
87// }}}
88
89
90// Decode Exchange Class {{{
91fn decode_exchange_class(method_id: u16, _payload: &mut BytesMut) -> ExchangeClass {
92    use self::ExchangeClass::*;
93    match method_id {
94        11 => DeclareOk,
95        21 => DeleteOk,
96        31 => BindOk, // rabbitmq-specific extension
97        51 => UnbindOk, // rabbitmq-specific extension
98        m => unreachable!("Unexpected method id {} in Channel class", m),
99    }
100}
101// }}}
102
103
104// Decode Queue Class {{{
105fn decode_queue_class(method_id: u16, bytes: &mut BytesMut) -> QueueClass {
106    use frame::method::queue::*;
107    use self::QueueClass::*;
108    match method_id {
109        11 => DeclareOk(DeclareOkMethod {
110            queue: decode_short_str(bytes),
111            message_count: decode_long(bytes),
112            consumer_count: decode_long(bytes),
113        }),
114        21 => BindOk,
115        31 => PurgeOk(PurgeOkMethod { message_count: decode_long(bytes) }),
116        41 => DeleteOk(DeleteOkMethod { message_count: decode_long(bytes) }),
117        51 => UnbindOk,
118        m => unreachable!("Unexpected method id {} in Channel class", m),
119    }
120}
121// }}}
122
123
124// Decode Basic Class {{{
125fn decode_basic_class(method_id: u16, bytes: &mut BytesMut) -> BasicClass {
126    use frame::method::basic::*;
127    use self::BasicClass::*;
128    match method_id {
129        11 => QosOk,
130        21 => ConsumeOk(ConsumeOkMethod { consumer_tag: decode_short_str(bytes) }),
131        31 => CancelOk(CancelOkMethod { consumer_tag: decode_short_str(bytes) }),
132        50 => Return(ReturnMethod {
133            reply_code: decode_short(bytes),
134            reply_text: decode_short_str(bytes),
135            exchange: decode_short_str(bytes),
136            routing_key: decode_short_str(bytes),
137        }),
138        60 => Deliver(DeliverMethod {
139            consumer_tag: decode_short_str(bytes),
140            delivery_tag: decode_longlong(bytes),
141            redeliverd: decode_bool_1(bytes),
142            exchange: decode_short_str(bytes),
143            routing_key: decode_short_str(bytes),
144        }),
145        71 => GetOk(GetOkMethod {
146            delivery_tag: decode_longlong(bytes),
147            redeliverd: decode_bool_1(bytes),
148            exchange: decode_short_str(bytes),
149            routing_key: decode_short_str(bytes),
150            message_count: decode_long(bytes),
151        }),
152        72 => GetEmpty(GetEmptyMethod { reserved1: decode_short_str(bytes) }),
153        80 => Ack(AckMethod {
154            delivery_tag: decode_longlong(bytes),
155            multiple: decode_bool_1(bytes),
156        }),
157
158        // rabbitmq-specific extension
159        120 => Nack(NackMethod {
160            delivery_tag: decode_longlong(bytes),
161            multiple: decode_bool_1(bytes),
162        }),
163
164        m => unreachable!("Unexpected method id {} in Channel class", m),
165    }
166}
167// }}}
168
169
170// Decode Tx Class {{{
171fn decode_tx_class(method_id: u16, _bytes: &mut BytesMut) -> TxClass {
172    use self::TxClass::*;
173    match method_id {
174        11 => SelectOk,
175        21 => CommitOk,
176        31 => RollbackOk,
177        m => unreachable!("Unexpected method id {} in Connection class", m),
178    }
179}
180// }}}
181
182
183// Decode methods {{{
184fn decode_bool_1(bytes: &mut BytesMut) -> bool {
185    bytes.split_to(1)[0] & 0b_1000_0000 == 0b_1000_0000
186}
187
188
189fn decode_octet(bytes: &mut BytesMut) -> u8 {
190    bytes.split_to(1)[0]
191}
192
193
194fn decode_short(bytes: &mut BytesMut) -> u16 {
195    Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>()
196}
197
198
199fn decode_long(bytes: &mut BytesMut) -> u32 {
200    Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>()
201}
202
203
204fn decode_longlong(bytes: &mut BytesMut) -> u64 {
205    Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>()
206}
207
208
209fn decode_short_str(bytes: &mut BytesMut) -> AmqpString {
210    let len = bytes.split_to(1)[0];
211    AmqpString(bytes.split_to(len as usize).freeze())
212}
213
214
215fn decode_long_str(bytes: &mut BytesMut) -> AmqpString {
216    let len = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>();
217    AmqpString(bytes.split_to(len as usize).freeze())
218}
219
220
221pub(crate) fn decode_field_table(bytes: &mut BytesMut) -> HashMap<AmqpString, FieldArgument> {
222    debug!("decode field table");
223
224    let size = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>() as u64;
225
226    let mut bytes = bytes.split_to(size as usize);
227
228    let mut table = HashMap::new();
229
230    while bytes.len() > 0 {
231        let item_name = decode_short_str(&mut bytes);
232        let item_value = decode_field_item_value(&mut bytes);
233        table.insert(item_name, item_value);
234    }
235
236    table
237}
238
239
240/*
241fn decode_field_array(cursor: &mut Cursor<Bytes>) -> Result<FieldArray> {
242    let size = cursor.get_u32::<BigEndian>() as u64;
243
244    let mut items = Vec::new();
245    let start_pos = cursor.position();
246
247    while (cursor.position() - start_pos) < size {
248        let item = decode_field_item_value(cursor);
249        items.push(item);
250    }
251    Ok(FieldArray(items))
252}
253*/
254
255
256fn decode_field_item_value(bytes: &mut BytesMut) -> FieldArgument {
257    let flag = bytes.split_to(1)[0];
258    match flag {
259        0x74 => FieldArgument::Boolean(bytes.split_to(1)[0] == 0x01),
260        0x62 => FieldArgument::SignedOctet(Cursor::new(bytes.split_to(1)).get_i8()),
261        0x42 => FieldArgument::UnsignedOctet(bytes.split_to(1)[0]),
262        0x55 => FieldArgument::SignedShort(Cursor::new(bytes.split_to(2)).get_i16::<BigEndian>()),
263        0x75 => FieldArgument::UnsignedShort(Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>()),
264        0x49 => FieldArgument::SignedLong(Cursor::new(bytes.split_to(4)).get_i32::<BigEndian>()),
265        0x69 => FieldArgument::UnsignedLong(Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>()),
266        0x4C => FieldArgument::SignedLongLong(
267            Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>(),
268        ),
269        0x6C => FieldArgument::UnsignedLongLong(
270            Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>(),
271        ),
272        0x66 => FieldArgument::Float(Cursor::new(bytes.split_to(4)).get_f32::<BigEndian>()),
273        0x63 => FieldArgument::Double(Cursor::new(bytes.split_to(8)).get_f64::<BigEndian>()),
274        0x44 => FieldArgument::Decimal(Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>()),
275        0x73 => {
276            let len = bytes.split_to(1)[0];
277            FieldArgument::ShortString(AmqpString(bytes.split_to(len as usize).freeze()))
278        }
279        0x53 => {
280            let len = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>();
281            FieldArgument::LongString(AmqpString(bytes.split_to(len as usize).freeze()))
282        }
283        0x54 => FieldArgument::Timestamp(Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>()),
284        0x46 => FieldArgument::NestedTable(decode_field_table(bytes)),
285        0x56 => FieldArgument::Void,
286        0x78 => panic!(), // I don't know how should I treat it
287        b => unreachable!("Unexpected byte {} at decode_field_item_value", b),
288    }
289}
290// }}}