amqpr_codec/frame/method/
encoder.rs1use bytes::{BufMut, BigEndian};
2
3use std::collections::HashMap;
4
5use super::{MethodPayload, ConnectionClass, ChannelClass, ExchangeClass, QueueClass, BasicClass,
6 TxClass};
7use args::*;
8
9
10pub fn encode_payload(payload: MethodPayload) -> Vec<u8> {
11 use self::MethodPayload::*;
12 match payload {
13 Connection(class) => encode_connection_class(class),
14 Channel(class) => encode_channel_class(class),
15 Exchange(class) => encode_exchange_class(class),
16 Queue(class) => encode_queue_class(class),
17 Basic(class) => encode_basic_class(class),
18 Tx(class) => encode_tx_class(class),
19 }
20}
21
22
23struct InnerEncoder {
24 buf: Vec<u8>,
25}
26
27
28fn encode_connection_class(class: ConnectionClass) -> Vec<u8> {
30 const CLASS_ID: u16 = 10;
31 use self::ConnectionClass::*;
32 match class {
33 StartOk(m) => {
34 InnerEncoder::class_and_method_id(CLASS_ID, 11)
35 .encode_field_table(m.client_properties)
36 .encode_short_str(m.mechanism)
37 .encode_long_str(m.response)
38 .encode_short_str(m.locale)
39 .vec()
40 }
41 SecureOk(m) => {
42 InnerEncoder::class_and_method_id(CLASS_ID, 21)
43 .encode_long_str(m.response)
44 .vec()
45 }
46 TuneOk(m) => {
47 InnerEncoder::class_and_method_id(CLASS_ID, 31)
48 .encode_short(m.channel_max)
49 .encode_long(m.frame_max)
50 .encode_short(m.heartbeat)
51 .vec()
52 }
53 Open(m) => {
54 InnerEncoder::class_and_method_id(CLASS_ID, 40)
55 .encode_short_str(m.virtual_host)
56 .encode_short_str(m.reserved1)
57 .encode_bit_1(m.reserved2)
58 .vec()
59 }
60 Close(m) => {
61 InnerEncoder::class_and_method_id(CLASS_ID, 50)
62 .encode_short(m.reply_code)
63 .encode_short_str(m.reply_text)
64 .encode_short(m.class_id)
65 .encode_short(m.method_id)
66 .vec()
67 }
68 CloseOk => InnerEncoder::class_and_method_id(CLASS_ID, 51).vec(),
69 _ => unreachable!("Others methods are never be sent by client"),
70 }
71}
72fn encode_channel_class(class: ChannelClass) -> Vec<u8> {
77 const CLASS_ID: u16 = 20;
78 use self::ChannelClass::*;
79 match class {
80 Open(m) => {
81 InnerEncoder::class_and_method_id(CLASS_ID, 10)
82 .encode_short_str(m.reserved1)
83 .vec()
84 }
85 Flow(m) => {
86 InnerEncoder::class_and_method_id(CLASS_ID, 20)
87 .encode_bit_1(m.active)
88 .vec()
89 }
90 FlowOk(m) => {
91 InnerEncoder::class_and_method_id(CLASS_ID, 21)
92 .encode_bit_1(m.active)
93 .vec()
94 }
95 Close(m) => {
96 InnerEncoder::class_and_method_id(CLASS_ID, 40)
97 .encode_short(m.reply_code)
98 .encode_short_str(m.reply_text)
99 .encode_short(m.class_id)
100 .encode_short(m.method_id)
101 .vec()
102 }
103 CloseOk => InnerEncoder::class_and_method_id(CLASS_ID, 41).vec(),
104 _ => unreachable!("Others methods are never be sent by client"),
105 }
106}
107fn encode_exchange_class(class: ExchangeClass) -> Vec<u8> {
112 const CLASS_ID: u16 = 40;
113 use self::ExchangeClass::*;
114 match class {
115 Declare(m) => {
116 InnerEncoder::class_and_method_id(CLASS_ID, 10)
117 .encode_short(m.reserved1)
118 .encode_short_str(m.exchange)
119 .encode_short_str(m.typ)
120 .encode_bit_5(m.passive, m.durable, m.auto_delete, m.internal, m.no_wait)
121 .encode_field_table(m.arguments)
122 .vec()
123 }
124 Delete(m) => {
125 InnerEncoder::class_and_method_id(CLASS_ID, 20)
126 .encode_short(m.reserved1)
127 .encode_short_str(m.exchange)
128 .encode_bit_2(m.if_unused, m.no_wait)
129 .vec()
130 }
131 Bind(m) => {
132 InnerEncoder::class_and_method_id(CLASS_ID, 30)
133 .encode_short(m.reserved1)
134 .encode_short_str(m.destination)
135 .encode_short_str(m.source)
136 .encode_short_str(m.routing_key)
137 .encode_bit_1(m.no_wait)
138 .encode_field_table(m.arguments)
139 .vec()
140 }
141 Unbind(m) => {
142 InnerEncoder::class_and_method_id(CLASS_ID, 40)
143 .encode_short(m.reserved1)
144 .encode_short_str(m.destination)
145 .encode_short_str(m.source)
146 .encode_short_str(m.routing_key)
147 .encode_bit_1(m.no_wait)
148 .encode_field_table(m.arguments)
149 .vec()
150 }
151 _ => unreachable!("Others methods are never be sent by client"),
152 }
153}
154fn encode_queue_class(class: QueueClass) -> Vec<u8> {
159 const CLASS_ID: u16 = 50;
160 use self::QueueClass::*;
161 match class {
162 Declare(m) => {
163 InnerEncoder::class_and_method_id(CLASS_ID, 10)
164 .encode_short(m.reserved1)
165 .encode_short_str(m.queue)
166 .encode_bit_5(m.passive, m.durable, m.exclusive, m.auto_delete, m.no_wait)
167 .encode_field_table(m.arguments)
168 .vec()
169 }
170 Bind(m) => {
171 InnerEncoder::class_and_method_id(CLASS_ID, 20)
172 .encode_short(m.reserved1)
173 .encode_short_str(m.queue)
174 .encode_short_str(m.exchange)
175 .encode_short_str(m.routing_key)
176 .encode_bit_1(m.no_wait)
177 .encode_field_table(m.arguments)
178 .vec()
179 }
180 Unbind(m) => {
181 InnerEncoder::class_and_method_id(CLASS_ID, 50)
182 .encode_short(m.reserved1)
183 .encode_short_str(m.queue)
184 .encode_short_str(m.exchange)
185 .encode_short_str(m.routing_key)
186 .encode_field_table(m.arguments)
187 .vec()
188 }
189 Purge(m) => {
190 InnerEncoder::class_and_method_id(CLASS_ID, 30)
191 .encode_short(m.reserved1)
192 .encode_short_str(m.queue)
193 .encode_bit_1(m.no_wait)
194 .vec()
195 }
196 Delete(m) => {
197 InnerEncoder::class_and_method_id(CLASS_ID, 40)
198 .encode_short(m.reserved1)
199 .encode_short_str(m.queue)
200 .encode_bit_3(m.if_unused, m.if_empty, m.no_wait)
201 .vec()
202 }
203 _ => unreachable!("Others methods are never be sent by client"),
204 }
205}
206fn encode_basic_class(class: BasicClass) -> Vec<u8> {
211 const CLASS_ID: u16 = 60;
212 use self::BasicClass::*;
213 match class {
214 Qos(m) => {
215 InnerEncoder::class_and_method_id(CLASS_ID, 10)
216 .encode_long(m.prefetch_size)
217 .encode_short(m.prefetch_count)
218 .encode_bit_1(m.global)
219 .vec()
220 }
221 Consume(m) => {
222 InnerEncoder::class_and_method_id(CLASS_ID, 20)
223 .encode_short(m.reserved1)
224 .encode_short_str(m.queue)
225 .encode_short_str(m.consumer_tag)
226 .encode_bit_4(m.no_local, m.no_ack, m.exclusive, m.no_wait)
227 .encode_field_table(m.arguments)
228 .vec()
229 }
230 Cancel(m) => {
231 InnerEncoder::class_and_method_id(CLASS_ID, 30)
232 .encode_short_str(m.consumer_tag)
233 .encode_bit_1(m.no_wait)
234 .vec()
235 }
236 Publish(m) => {
237 InnerEncoder::class_and_method_id(CLASS_ID, 40)
238 .encode_short(m.reserved1)
239 .encode_short_str(m.exchange)
240 .encode_short_str(m.routing_key)
241 .encode_bit_2(m.mandatory, m.immediate)
242 .vec()
243 }
244 Get(m) => {
245 InnerEncoder::class_and_method_id(CLASS_ID, 70)
246 .encode_short(m.reserved1)
247 .encode_short_str(m.queue)
248 .encode_bit_1(m.no_ack)
249 .vec()
250 }
251 Ack(m) => {
252 InnerEncoder::class_and_method_id(CLASS_ID, 80)
253 .encode_longlong(m.delivery_tag)
254 .encode_bit_1(m.multiple)
255 .vec()
256 }
257 Reject(m) => {
258 InnerEncoder::class_and_method_id(CLASS_ID, 90)
259 .encode_longlong(m.delivery_tag)
260 .encode_bit_1(m.requeue)
261 .vec()
262 }
263 Nack(m) => {
264 InnerEncoder::class_and_method_id(CLASS_ID, 120)
265 .encode_longlong(m.delivery_tag)
266 .encode_bit_1(m.multiple)
267 .vec()
268 }
269 RecoverAsync(m) => {
270 InnerEncoder::class_and_method_id(CLASS_ID, 100)
271 .encode_bit_1(m.requeue)
272 .vec()
273 }
274 Recover(m) => {
275 InnerEncoder::class_and_method_id(CLASS_ID, 110)
276 .encode_bit_1(m.requeue)
277 .vec()
278 }
279 _ => unreachable!("Others methods are never be sent by client"),
280 }
281}
282fn encode_tx_class(class: TxClass) -> Vec<u8> {
287 const CLASS_ID: u16 = 90;
288 use self::TxClass::*;
289 match class {
290 Select => InnerEncoder::class_and_method_id(CLASS_ID, 10).vec(),
291 Commit => InnerEncoder::class_and_method_id(CLASS_ID, 20).vec(),
292 Rollback => InnerEncoder::class_and_method_id(CLASS_ID, 30).vec(),
293 _ => unreachable!("Others methods are never be sent by client"),
294 }
295}
296impl InnerEncoder {
302 fn class_and_method_id(class_id: u16, method_id: u16) -> InnerEncoder {
303 const INITIAL_CAPACITY: usize = 8;
304
305 let mut buf = Vec::with_capacity(INITIAL_CAPACITY);
306 buf.put_u16::<BigEndian>(class_id);
307 buf.put_u16::<BigEndian>(method_id);
308
309 InnerEncoder { buf: buf }
310 }
311
312 #[allow(dead_code)]
314 fn encode_octet(mut self, octet: u8) -> InnerEncoder {
315 self.buf.put_u8(octet);
316 self
317 }
318
319 fn encode_short(mut self, short: u16) -> InnerEncoder {
320 self.buf.put_u16::<BigEndian>(short);
321 self
322 }
323
324 fn encode_long(mut self, long: u32) -> InnerEncoder {
325 self.buf.put_u32::<BigEndian>(long);
326 self
327 }
328
329 fn encode_longlong(mut self, longlong: u64) -> InnerEncoder {
330 self.buf.put_u64::<BigEndian>(longlong);
331 self
332 }
333
334 fn encode_bit_1(self, bit: bool) -> InnerEncoder {
335 self.encode_bit_5(bit, false, false, false, false)
336 }
337
338 fn encode_bit_2(self, bit1: bool, bit2: bool) -> InnerEncoder {
339 self.encode_bit_5(bit1, bit2, false, false, false)
340 }
341
342 fn encode_bit_3(self, bit1: bool, bit2: bool, bit3: bool) -> InnerEncoder {
343 self.encode_bit_5(bit1, bit2, bit3, false, false)
344 }
345
346 fn encode_bit_4(self, bit1: bool, bit2: bool, bit3: bool, bit4: bool) -> InnerEncoder {
347 self.encode_bit_5(bit1, bit2, bit3, bit4, false)
348 }
349
350 fn encode_bit_5(
351 mut self,
352 bit1: bool,
353 bit2: bool,
354 bit3: bool,
355 bit4: bool,
356 bit5: bool,
357 ) -> InnerEncoder {
358 let byte = (bit1 as u8 * 0b_0000_0001) + (bit2 as u8 * 0b_0000_0010) +
359 (bit3 as u8 * 0b_0000_0100) + (bit4 as u8 * 0b_0000_1000) +
360 (bit5 as u8 * 0b_0001_0000);
361 self.buf.put_u8(byte);
362 self
363 }
364
365 fn encode_short_str(mut self, string: AmqpString) -> InnerEncoder {
366 self.buf.put_u8(string.len() as u8);
367 self.buf.put(string.as_bytes());
368 self
369 }
370
371 fn encode_long_str(mut self, string: AmqpString) -> InnerEncoder {
372 self.buf.put_u32::<BigEndian>(string.len() as u32);
373 self.buf.put(string.as_bytes());
374 self
375 }
376
377 fn encode_field_table(mut self, table: HashMap<AmqpString, FieldArgument>) -> InnerEncoder {
378 encode_field_table_0(&table, &mut self.buf);
379 self
380 }
381
382 fn vec(self) -> Vec<u8> {
383 self.buf
384 }
385}
386pub(crate) fn encode_field_table_0(table: &HashMap<AmqpString, FieldArgument>, dst: &mut Vec<u8>) {
391 let mut bytes = {
392 let mut buf = Vec::new();
393 for (item_name, item_value) in table.iter() {
394 buf.put_u8(item_name.len() as u8);
395 buf.put_slice(item_name.as_bytes());
396 encode_field_item(item_value, &mut buf);
397 }
398 buf
399 };
400
401 dst.put_u32::<BigEndian>(bytes.len() as u32);
402 dst.append(&mut bytes);
403}
404
405
406fn encode_field_item(item: &FieldArgument, dst: &mut Vec<u8>) {
423 match item {
424 &FieldArgument::Boolean(b) => {
425 dst.put_u8(b't');
426 dst.put_u8(b as u8);
427 }
428 &FieldArgument::SignedOctet(byte) => {
429 dst.put_u8(b'b');
430 dst.put_i8(byte);
431 }
432 &FieldArgument::UnsignedOctet(byte) => {
433 dst.put_u8(b'B');
434 dst.put_u8(byte);
435 }
436 &FieldArgument::SignedShort(short) => {
437 dst.put_u8(b'U');
438 dst.put_i16::<BigEndian>(short);
439 }
440 &FieldArgument::UnsignedShort(short) => {
441 dst.put_u8(b'u');
442 dst.put_u16::<BigEndian>(short);
443 }
444 &FieldArgument::SignedLong(long) => {
445 dst.put_u8(b'I');
446 dst.put_i32::<BigEndian>(long);
447 }
448 &FieldArgument::UnsignedLong(long) => {
449 dst.put_u8(b'i');
450 dst.put_u32::<BigEndian>(long);
451 }
452 &FieldArgument::SignedLongLong(longlong) => {
453 dst.put_u8(b'L');
454 dst.put_i64::<BigEndian>(longlong);
455 }
456 &FieldArgument::UnsignedLongLong(longlong) => {
457 dst.put_u8(b'l');
458 dst.put_u64::<BigEndian>(longlong);
459 }
460 &FieldArgument::Float(float) => {
461 dst.put_u8(b'f');
462 dst.put_f32::<BigEndian>(float);
463 }
464 &FieldArgument::Double(double) => {
465 dst.put_u8(b'd');
466 dst.put_f64::<BigEndian>(double);
467 }
468 &FieldArgument::Decimal(decimal) => {
469 dst.put_u8(b'D');
470 dst.put_i64::<BigEndian>(decimal);
471 }
472 &FieldArgument::ShortString(ref s) => {
473 dst.put_u8(b's');
474 dst.put_u8(s.len() as u8);
475 dst.put(s.as_bytes());
476 }
477 &FieldArgument::LongString(ref s) => {
478 dst.put_u8(b'S');
479 dst.put_u32::<BigEndian>(s.len() as u32);
480 dst.put(s.as_bytes());
481 }
482 &FieldArgument::Timestamp(ts) => {
483 dst.put_u8(b'T');
484 dst.put_u64::<BigEndian>(ts);
485 }
486 &FieldArgument::NestedTable(ref table) => {
487 dst.put_u8(b'F');
488 encode_field_table_0(table, dst);
489 }
490 &FieldArgument::Void => {
491 dst.put_u8(b'V');
492 }
493 &FieldArgument::ByteArray(ref _array) => {
494 dst.put_u8(b'x');
495 panic!("Fail to parse ByteArray") }
497 }
498}
499