amqpr_codec/frame/method/
decoder.rs1use bytes::{BytesMut, BigEndian, Buf};
2
3use std::io::Cursor;
4use std::collections::HashMap;
5
6use frame::method::{MethodPayload, ConnectionClass, ChannelClass, ExchangeClass, QueueClass,
7 BasicClass, TxClass};
8use args::*;
9
10pub fn decode_payload(bytes: &mut BytesMut) -> MethodPayload {
13 let mut cursor = Cursor::new(bytes.split_off(4).freeze());
14
15 let class_id = cursor.get_u16::<BigEndian>();
16 debug!("class_id is {}", class_id);
17
18 let method_id = cursor.get_u16::<BigEndian>();
19 debug!("method_id is {}", method_id);
20
21 drop(cursor);
22
23 match class_id {
24 10 => MethodPayload::Connection(decode_connection_class(method_id, bytes)),
25 20 => MethodPayload::Channel(decode_channel_class(method_id, bytes)),
26 40 => MethodPayload::Exchange(decode_exchange_class(method_id, bytes)),
27 50 => MethodPayload::Queue(decode_queue_class(method_id, bytes)),
28 60 => MethodPayload::Basic(decode_basic_class(method_id, bytes)),
29 90 => MethodPayload::Tx(decode_tx_class(method_id, bytes)),
30 c => unreachable!("Unexpected class id {}", c),
31 }
32}
33
34
35fn decode_connection_class(method_id: u16, bytes: &mut BytesMut) -> ConnectionClass {
37 use frame::method::connection::*;
38 use self::ConnectionClass::*;
39 match method_id {
40 10 => Start(StartMethod {
41 version_major: decode_octet(bytes),
42 version_minor: decode_octet(bytes),
43 server_properties: decode_field_table(bytes),
44 mechanisms: decode_long_str(bytes),
45 locales: decode_long_str(bytes),
46 }),
47 20 => Secure(SecureMethod { challenge: decode_long_str(bytes) }),
48 30 => Tune(TuneMethod {
49 channel_max: decode_short(bytes),
50 frame_max: decode_long(bytes),
51 heartbeat: decode_short(bytes),
52 }),
53 41 => OpenOk(OpenOkMethod { reserved1: decode_short_str(bytes) }),
54 50 => Close(CloseMethod {
55 reply_code: decode_short(bytes),
56 reply_text: decode_short_str(bytes),
57 class_id: decode_short(bytes),
58 method_id: decode_short(bytes),
59 }),
60 51 => CloseOk,
61 60 => Blocked(BlockedMethod { reason: decode_short_str(bytes) }),
62 61 => Unblocked,
63 m => unreachable!("Unexpected method id {} in Connection class", m),
64 }
65}
66fn decode_channel_class(method_id: u16, bytes: &mut BytesMut) -> ChannelClass {
71 use frame::method::channel::*;
72 use self::ChannelClass::*;
73 match method_id {
74 11 => OpenOk(OpenOkMethod { reserved1: decode_long_str(bytes) }),
75 20 => Flow(FlowMethod { active: decode_bool_1(bytes) }),
76 21 => FlowOk(FlowOkMethod { active: decode_bool_1(bytes) }),
77 40 => Close(CloseMethod {
78 reply_code: decode_short(bytes),
79 reply_text: decode_short_str(bytes),
80 class_id: decode_short(bytes),
81 method_id: decode_short(bytes),
82 }),
83 41 => CloseOk,
84 m => unreachable!("Unexpected method id {} in Channel class", m),
85 }
86}
87fn decode_exchange_class(method_id: u16, _payload: &mut BytesMut) -> ExchangeClass {
92 use self::ExchangeClass::*;
93 match method_id {
94 11 => DeclareOk,
95 21 => DeleteOk,
96 31 => BindOk, 51 => UnbindOk, m => unreachable!("Unexpected method id {} in Channel class", m),
99 }
100}
101fn decode_queue_class(method_id: u16, bytes: &mut BytesMut) -> QueueClass {
106 use frame::method::queue::*;
107 use self::QueueClass::*;
108 match method_id {
109 11 => DeclareOk(DeclareOkMethod {
110 queue: decode_short_str(bytes),
111 message_count: decode_long(bytes),
112 consumer_count: decode_long(bytes),
113 }),
114 21 => BindOk,
115 31 => PurgeOk(PurgeOkMethod { message_count: decode_long(bytes) }),
116 41 => DeleteOk(DeleteOkMethod { message_count: decode_long(bytes) }),
117 51 => UnbindOk,
118 m => unreachable!("Unexpected method id {} in Channel class", m),
119 }
120}
121fn decode_basic_class(method_id: u16, bytes: &mut BytesMut) -> BasicClass {
126 use frame::method::basic::*;
127 use self::BasicClass::*;
128 match method_id {
129 11 => QosOk,
130 21 => ConsumeOk(ConsumeOkMethod { consumer_tag: decode_short_str(bytes) }),
131 31 => CancelOk(CancelOkMethod { consumer_tag: decode_short_str(bytes) }),
132 50 => Return(ReturnMethod {
133 reply_code: decode_short(bytes),
134 reply_text: decode_short_str(bytes),
135 exchange: decode_short_str(bytes),
136 routing_key: decode_short_str(bytes),
137 }),
138 60 => Deliver(DeliverMethod {
139 consumer_tag: decode_short_str(bytes),
140 delivery_tag: decode_longlong(bytes),
141 redeliverd: decode_bool_1(bytes),
142 exchange: decode_short_str(bytes),
143 routing_key: decode_short_str(bytes),
144 }),
145 71 => GetOk(GetOkMethod {
146 delivery_tag: decode_longlong(bytes),
147 redeliverd: decode_bool_1(bytes),
148 exchange: decode_short_str(bytes),
149 routing_key: decode_short_str(bytes),
150 message_count: decode_long(bytes),
151 }),
152 72 => GetEmpty(GetEmptyMethod { reserved1: decode_short_str(bytes) }),
153 80 => Ack(AckMethod {
154 delivery_tag: decode_longlong(bytes),
155 multiple: decode_bool_1(bytes),
156 }),
157
158 120 => Nack(NackMethod {
160 delivery_tag: decode_longlong(bytes),
161 multiple: decode_bool_1(bytes),
162 }),
163
164 m => unreachable!("Unexpected method id {} in Channel class", m),
165 }
166}
167fn decode_tx_class(method_id: u16, _bytes: &mut BytesMut) -> TxClass {
172 use self::TxClass::*;
173 match method_id {
174 11 => SelectOk,
175 21 => CommitOk,
176 31 => RollbackOk,
177 m => unreachable!("Unexpected method id {} in Connection class", m),
178 }
179}
180fn decode_bool_1(bytes: &mut BytesMut) -> bool {
185 bytes.split_to(1)[0] & 0b_1000_0000 == 0b_1000_0000
186}
187
188
189fn decode_octet(bytes: &mut BytesMut) -> u8 {
190 bytes.split_to(1)[0]
191}
192
193
194fn decode_short(bytes: &mut BytesMut) -> u16 {
195 Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>()
196}
197
198
199fn decode_long(bytes: &mut BytesMut) -> u32 {
200 Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>()
201}
202
203
204fn decode_longlong(bytes: &mut BytesMut) -> u64 {
205 Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>()
206}
207
208
209fn decode_short_str(bytes: &mut BytesMut) -> AmqpString {
210 let len = bytes.split_to(1)[0];
211 AmqpString(bytes.split_to(len as usize).freeze())
212}
213
214
215fn decode_long_str(bytes: &mut BytesMut) -> AmqpString {
216 let len = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>();
217 AmqpString(bytes.split_to(len as usize).freeze())
218}
219
220
221pub(crate) fn decode_field_table(bytes: &mut BytesMut) -> HashMap<AmqpString, FieldArgument> {
222 debug!("decode field table");
223
224 let size = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>() as u64;
225
226 let mut bytes = bytes.split_to(size as usize);
227
228 let mut table = HashMap::new();
229
230 while bytes.len() > 0 {
231 let item_name = decode_short_str(&mut bytes);
232 let item_value = decode_field_item_value(&mut bytes);
233 table.insert(item_name, item_value);
234 }
235
236 table
237}
238
239
240fn decode_field_item_value(bytes: &mut BytesMut) -> FieldArgument {
257 let flag = bytes.split_to(1)[0];
258 match flag {
259 0x74 => FieldArgument::Boolean(bytes.split_to(1)[0] == 0x01),
260 0x62 => FieldArgument::SignedOctet(Cursor::new(bytes.split_to(1)).get_i8()),
261 0x42 => FieldArgument::UnsignedOctet(bytes.split_to(1)[0]),
262 0x55 => FieldArgument::SignedShort(Cursor::new(bytes.split_to(2)).get_i16::<BigEndian>()),
263 0x75 => FieldArgument::UnsignedShort(Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>()),
264 0x49 => FieldArgument::SignedLong(Cursor::new(bytes.split_to(4)).get_i32::<BigEndian>()),
265 0x69 => FieldArgument::UnsignedLong(Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>()),
266 0x4C => FieldArgument::SignedLongLong(
267 Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>(),
268 ),
269 0x6C => FieldArgument::UnsignedLongLong(
270 Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>(),
271 ),
272 0x66 => FieldArgument::Float(Cursor::new(bytes.split_to(4)).get_f32::<BigEndian>()),
273 0x63 => FieldArgument::Double(Cursor::new(bytes.split_to(8)).get_f64::<BigEndian>()),
274 0x44 => FieldArgument::Decimal(Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>()),
275 0x73 => {
276 let len = bytes.split_to(1)[0];
277 FieldArgument::ShortString(AmqpString(bytes.split_to(len as usize).freeze()))
278 }
279 0x53 => {
280 let len = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>();
281 FieldArgument::LongString(AmqpString(bytes.split_to(len as usize).freeze()))
282 }
283 0x54 => FieldArgument::Timestamp(Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>()),
284 0x46 => FieldArgument::NestedTable(decode_field_table(bytes)),
285 0x56 => FieldArgument::Void,
286 0x78 => panic!(), b => unreachable!("Unexpected byte {} at decode_field_item_value", b),
288 }
289}
290