amqpr_codec/frame/method/
encoder.rs

1use bytes::{BufMut, BigEndian};
2
3use std::collections::HashMap;
4
5use super::{MethodPayload, ConnectionClass, ChannelClass, ExchangeClass, QueueClass, BasicClass,
6            TxClass};
7use args::*;
8
9
10pub fn encode_payload(payload: MethodPayload) -> Vec<u8> {
11    use self::MethodPayload::*;
12    match payload {
13        Connection(class) => encode_connection_class(class),
14        Channel(class) => encode_channel_class(class),
15        Exchange(class) => encode_exchange_class(class),
16        Queue(class) => encode_queue_class(class),
17        Basic(class) => encode_basic_class(class),
18        Tx(class) => encode_tx_class(class),
19    }
20}
21
22
23struct InnerEncoder {
24    buf: Vec<u8>,
25}
26
27
28// Encode Connection Class {{{
29fn encode_connection_class(class: ConnectionClass) -> Vec<u8> {
30    const CLASS_ID: u16 = 10;
31    use self::ConnectionClass::*;
32    match class {
33        StartOk(m) => {
34            InnerEncoder::class_and_method_id(CLASS_ID, 11)
35                .encode_field_table(m.client_properties)
36                .encode_short_str(m.mechanism)
37                .encode_long_str(m.response)
38                .encode_short_str(m.locale)
39                .vec()
40        }
41        SecureOk(m) => {
42            InnerEncoder::class_and_method_id(CLASS_ID, 21)
43                .encode_long_str(m.response)
44                .vec()
45        }
46        TuneOk(m) => {
47            InnerEncoder::class_and_method_id(CLASS_ID, 31)
48                .encode_short(m.channel_max)
49                .encode_long(m.frame_max)
50                .encode_short(m.heartbeat)
51                .vec()
52        }
53        Open(m) => {
54            InnerEncoder::class_and_method_id(CLASS_ID, 40)
55                .encode_short_str(m.virtual_host)
56                .encode_short_str(m.reserved1)
57                .encode_bit_1(m.reserved2)
58                .vec()
59        }
60        Close(m) => {
61            InnerEncoder::class_and_method_id(CLASS_ID, 50)
62                .encode_short(m.reply_code)
63                .encode_short_str(m.reply_text)
64                .encode_short(m.class_id)
65                .encode_short(m.method_id)
66                .vec()
67        }
68        CloseOk => InnerEncoder::class_and_method_id(CLASS_ID, 51).vec(),
69        _ => unreachable!("Others methods are never be sent by client"),
70    }
71}
72// }}}
73
74
75// Encode Channel Class  {{{
76fn encode_channel_class(class: ChannelClass) -> Vec<u8> {
77    const CLASS_ID: u16 = 20;
78    use self::ChannelClass::*;
79    match class {
80        Open(m) => {
81            InnerEncoder::class_and_method_id(CLASS_ID, 10)
82                .encode_short_str(m.reserved1)
83                .vec()
84        }
85        Flow(m) => {
86            InnerEncoder::class_and_method_id(CLASS_ID, 20)
87                .encode_bit_1(m.active)
88                .vec()
89        }
90        FlowOk(m) => {
91            InnerEncoder::class_and_method_id(CLASS_ID, 21)
92                .encode_bit_1(m.active)
93                .vec()
94        }
95        Close(m) => {
96            InnerEncoder::class_and_method_id(CLASS_ID, 40)
97                .encode_short(m.reply_code)
98                .encode_short_str(m.reply_text)
99                .encode_short(m.class_id)
100                .encode_short(m.method_id)
101                .vec()
102        }
103        CloseOk => InnerEncoder::class_and_method_id(CLASS_ID, 41).vec(),
104        _ => unreachable!("Others methods are never be sent by client"),
105    }
106}
107// }}}
108
109
110// Encode Exchange Class {{{
111fn encode_exchange_class(class: ExchangeClass) -> Vec<u8> {
112    const CLASS_ID: u16 = 40;
113    use self::ExchangeClass::*;
114    match class {
115        Declare(m) => {
116            InnerEncoder::class_and_method_id(CLASS_ID, 10)
117                .encode_short(m.reserved1)
118                .encode_short_str(m.exchange)
119                .encode_short_str(m.typ)
120                .encode_bit_5(m.passive, m.durable, m.auto_delete, m.internal, m.no_wait)
121                .encode_field_table(m.arguments)
122                .vec()
123        }
124        Delete(m) => {
125            InnerEncoder::class_and_method_id(CLASS_ID, 20)
126                .encode_short(m.reserved1)
127                .encode_short_str(m.exchange)
128                .encode_bit_2(m.if_unused, m.no_wait)
129                .vec()
130        }
131        Bind(m) => {
132            InnerEncoder::class_and_method_id(CLASS_ID, 30)
133                .encode_short(m.reserved1)
134                .encode_short_str(m.destination)
135                .encode_short_str(m.source)
136                .encode_short_str(m.routing_key)
137                .encode_bit_1(m.no_wait)
138                .encode_field_table(m.arguments)
139                .vec()
140        }
141        Unbind(m) => {
142            InnerEncoder::class_and_method_id(CLASS_ID, 40)
143                .encode_short(m.reserved1)
144                .encode_short_str(m.destination)
145                .encode_short_str(m.source)
146                .encode_short_str(m.routing_key)
147                .encode_bit_1(m.no_wait)
148                .encode_field_table(m.arguments)
149                .vec()
150        }
151        _ => unreachable!("Others methods are never be sent by client"),
152    }
153}
154// }}}
155
156
157// Encode Queue Class {{{
158fn encode_queue_class(class: QueueClass) -> Vec<u8> {
159    const CLASS_ID: u16 = 50;
160    use self::QueueClass::*;
161    match class {
162        Declare(m) => {
163            InnerEncoder::class_and_method_id(CLASS_ID, 10)
164                .encode_short(m.reserved1)
165                .encode_short_str(m.queue)
166                .encode_bit_5(m.passive, m.durable, m.exclusive, m.auto_delete, m.no_wait)
167                .encode_field_table(m.arguments)
168                .vec()
169        }
170        Bind(m) => {
171            InnerEncoder::class_and_method_id(CLASS_ID, 20)
172                .encode_short(m.reserved1)
173                .encode_short_str(m.queue)
174                .encode_short_str(m.exchange)
175                .encode_short_str(m.routing_key)
176                .encode_bit_1(m.no_wait)
177                .encode_field_table(m.arguments)
178                .vec()
179        }
180        Unbind(m) => {
181            InnerEncoder::class_and_method_id(CLASS_ID, 50)
182                .encode_short(m.reserved1)
183                .encode_short_str(m.queue)
184                .encode_short_str(m.exchange)
185                .encode_short_str(m.routing_key)
186                .encode_field_table(m.arguments)
187                .vec()
188        }
189        Purge(m) => {
190            InnerEncoder::class_and_method_id(CLASS_ID, 30)
191                .encode_short(m.reserved1)
192                .encode_short_str(m.queue)
193                .encode_bit_1(m.no_wait)
194                .vec()
195        }
196        Delete(m) => {
197            InnerEncoder::class_and_method_id(CLASS_ID, 40)
198                .encode_short(m.reserved1)
199                .encode_short_str(m.queue)
200                .encode_bit_3(m.if_unused, m.if_empty, m.no_wait)
201                .vec()
202        }
203        _ => unreachable!("Others methods are never be sent by client"),
204    }
205}
206// }}}
207
208
209// Encode Basic Class {{{
210fn encode_basic_class(class: BasicClass) -> Vec<u8> {
211    const CLASS_ID: u16 = 60;
212    use self::BasicClass::*;
213    match class {
214        Qos(m) => {
215            InnerEncoder::class_and_method_id(CLASS_ID, 10)
216                .encode_long(m.prefetch_size)
217                .encode_short(m.prefetch_count)
218                .encode_bit_1(m.global)
219                .vec()
220        }
221        Consume(m) => {
222            InnerEncoder::class_and_method_id(CLASS_ID, 20)
223                .encode_short(m.reserved1)
224                .encode_short_str(m.queue)
225                .encode_short_str(m.consumer_tag)
226                .encode_bit_4(m.no_local, m.no_ack, m.exclusive, m.no_wait)
227                .encode_field_table(m.arguments)
228                .vec()
229        }
230        Cancel(m) => {
231            InnerEncoder::class_and_method_id(CLASS_ID, 30)
232                .encode_short_str(m.consumer_tag)
233                .encode_bit_1(m.no_wait)
234                .vec()
235        }
236        Publish(m) => {
237            InnerEncoder::class_and_method_id(CLASS_ID, 40)
238                .encode_short(m.reserved1)
239                .encode_short_str(m.exchange)
240                .encode_short_str(m.routing_key)
241                .encode_bit_2(m.mandatory, m.immediate)
242                .vec()
243        }
244        Get(m) => {
245            InnerEncoder::class_and_method_id(CLASS_ID, 70)
246                .encode_short(m.reserved1)
247                .encode_short_str(m.queue)
248                .encode_bit_1(m.no_ack)
249                .vec()
250        }
251        Ack(m) => {
252            InnerEncoder::class_and_method_id(CLASS_ID, 80)
253                .encode_longlong(m.delivery_tag)
254                .encode_bit_1(m.multiple)
255                .vec()
256        }
257        Reject(m) => {
258            InnerEncoder::class_and_method_id(CLASS_ID, 90)
259                .encode_longlong(m.delivery_tag)
260                .encode_bit_1(m.requeue)
261                .vec()
262        }
263        Nack(m) => {
264            InnerEncoder::class_and_method_id(CLASS_ID, 120)
265                .encode_longlong(m.delivery_tag)
266                .encode_bit_1(m.multiple)
267                .vec()
268        }
269        RecoverAsync(m) => {
270            InnerEncoder::class_and_method_id(CLASS_ID, 100)
271                .encode_bit_1(m.requeue)
272                .vec()
273        }
274        Recover(m) => {
275            InnerEncoder::class_and_method_id(CLASS_ID, 110)
276                .encode_bit_1(m.requeue)
277                .vec()
278        }
279        _ => unreachable!("Others methods are never be sent by client"),
280    }
281}
282// }}}
283
284
285// Encode Tx Class {{{
286fn encode_tx_class(class: TxClass) -> Vec<u8> {
287    const CLASS_ID: u16 = 90;
288    use self::TxClass::*;
289    match class {
290        Select => InnerEncoder::class_and_method_id(CLASS_ID, 10).vec(),
291        Commit => InnerEncoder::class_and_method_id(CLASS_ID, 20).vec(),
292        Rollback => InnerEncoder::class_and_method_id(CLASS_ID, 30).vec(),
293        _ => unreachable!("Others methods are never be sent by client"),
294    }
295}
296// }}}
297
298
299
300// impl InnerEncoder {{{
301impl InnerEncoder {
302    fn class_and_method_id(class_id: u16, method_id: u16) -> InnerEncoder {
303        const INITIAL_CAPACITY: usize = 8;
304
305        let mut buf = Vec::with_capacity(INITIAL_CAPACITY);
306        buf.put_u16::<BigEndian>(class_id);
307        buf.put_u16::<BigEndian>(method_id);
308
309        InnerEncoder { buf: buf }
310    }
311
312    // For now, any encoded method does not have octet field
313    #[allow(dead_code)]
314    fn encode_octet(mut self, octet: u8) -> InnerEncoder {
315        self.buf.put_u8(octet);
316        self
317    }
318
319    fn encode_short(mut self, short: u16) -> InnerEncoder {
320        self.buf.put_u16::<BigEndian>(short);
321        self
322    }
323
324    fn encode_long(mut self, long: u32) -> InnerEncoder {
325        self.buf.put_u32::<BigEndian>(long);
326        self
327    }
328
329    fn encode_longlong(mut self, longlong: u64) -> InnerEncoder {
330        self.buf.put_u64::<BigEndian>(longlong);
331        self
332    }
333
334    fn encode_bit_1(self, bit: bool) -> InnerEncoder {
335        self.encode_bit_5(bit, false, false, false, false)
336    }
337
338    fn encode_bit_2(self, bit1: bool, bit2: bool) -> InnerEncoder {
339        self.encode_bit_5(bit1, bit2, false, false, false)
340    }
341
342    fn encode_bit_3(self, bit1: bool, bit2: bool, bit3: bool) -> InnerEncoder {
343        self.encode_bit_5(bit1, bit2, bit3, false, false)
344    }
345
346    fn encode_bit_4(self, bit1: bool, bit2: bool, bit3: bool, bit4: bool) -> InnerEncoder {
347        self.encode_bit_5(bit1, bit2, bit3, bit4, false)
348    }
349
350    fn encode_bit_5(
351        mut self,
352        bit1: bool,
353        bit2: bool,
354        bit3: bool,
355        bit4: bool,
356        bit5: bool,
357    ) -> InnerEncoder {
358        let byte = (bit1 as u8 * 0b_0000_0001) + (bit2 as u8 * 0b_0000_0010) +
359            (bit3 as u8 * 0b_0000_0100) + (bit4 as u8 * 0b_0000_1000) +
360            (bit5 as u8 * 0b_0001_0000);
361        self.buf.put_u8(byte);
362        self
363    }
364
365    fn encode_short_str(mut self, string: AmqpString) -> InnerEncoder {
366        self.buf.put_u8(string.len() as u8);
367        self.buf.put(string.as_bytes());
368        self
369    }
370
371    fn encode_long_str(mut self, string: AmqpString) -> InnerEncoder {
372        self.buf.put_u32::<BigEndian>(string.len() as u32);
373        self.buf.put(string.as_bytes());
374        self
375    }
376
377    fn encode_field_table(mut self, table: HashMap<AmqpString, FieldArgument>) -> InnerEncoder {
378        encode_field_table_0(&table, &mut self.buf);
379        self
380    }
381
382    fn vec(self) -> Vec<u8> {
383        self.buf
384    }
385}
386// }}}
387
388
389// Encode field-table and field-array {{{
390pub(crate) fn encode_field_table_0(table: &HashMap<AmqpString, FieldArgument>, dst: &mut Vec<u8>) {
391    let mut bytes = {
392        let mut buf = Vec::new();
393        for (item_name, item_value) in table.iter() {
394            buf.put_u8(item_name.len() as u8);
395            buf.put_slice(item_name.as_bytes());
396            encode_field_item(item_value, &mut buf);
397        }
398        buf
399    };
400
401    dst.put_u32::<BigEndian>(bytes.len() as u32);
402    dst.append(&mut bytes);
403}
404
405
406/*
407fn encode_field_array(array: &FieldArray, dst: &mut Vec<u8>) {
408    let mut bytes = {
409        let mut buf = Vec::new();
410        for item in array.iter() {
411            encode_field_item(item, &mut buf);
412        }
413        buf
414    };
415
416    dst.put_u32::<BigEndian>(bytes.len() as u32);
417    dst.append(&mut bytes);
418}
419*/
420
421
422fn encode_field_item(item: &FieldArgument, dst: &mut Vec<u8>) {
423    match item {
424        &FieldArgument::Boolean(b) => {
425            dst.put_u8(b't');
426            dst.put_u8(b as u8);
427        }
428        &FieldArgument::SignedOctet(byte) => {
429            dst.put_u8(b'b');
430            dst.put_i8(byte);
431        }
432        &FieldArgument::UnsignedOctet(byte) => {
433            dst.put_u8(b'B');
434            dst.put_u8(byte);
435        }
436        &FieldArgument::SignedShort(short) => {
437            dst.put_u8(b'U');
438            dst.put_i16::<BigEndian>(short);
439        }
440        &FieldArgument::UnsignedShort(short) => {
441            dst.put_u8(b'u');
442            dst.put_u16::<BigEndian>(short);
443        }
444        &FieldArgument::SignedLong(long) => {
445            dst.put_u8(b'I');
446            dst.put_i32::<BigEndian>(long);
447        }
448        &FieldArgument::UnsignedLong(long) => {
449            dst.put_u8(b'i');
450            dst.put_u32::<BigEndian>(long);
451        }
452        &FieldArgument::SignedLongLong(longlong) => {
453            dst.put_u8(b'L');
454            dst.put_i64::<BigEndian>(longlong);
455        }
456        &FieldArgument::UnsignedLongLong(longlong) => {
457            dst.put_u8(b'l');
458            dst.put_u64::<BigEndian>(longlong);
459        }
460        &FieldArgument::Float(float) => {
461            dst.put_u8(b'f');
462            dst.put_f32::<BigEndian>(float);
463        }
464        &FieldArgument::Double(double) => {
465            dst.put_u8(b'd');
466            dst.put_f64::<BigEndian>(double);
467        }
468        &FieldArgument::Decimal(decimal) => {
469            dst.put_u8(b'D');
470            dst.put_i64::<BigEndian>(decimal);
471        }
472        &FieldArgument::ShortString(ref s) => {
473            dst.put_u8(b's');
474            dst.put_u8(s.len() as u8);
475            dst.put(s.as_bytes());
476        }
477        &FieldArgument::LongString(ref s) => {
478            dst.put_u8(b'S');
479            dst.put_u32::<BigEndian>(s.len() as u32);
480            dst.put(s.as_bytes());
481        }
482        &FieldArgument::Timestamp(ts) => {
483            dst.put_u8(b'T');
484            dst.put_u64::<BigEndian>(ts);
485        }
486        &FieldArgument::NestedTable(ref table) => {
487            dst.put_u8(b'F');
488            encode_field_table_0(table, dst);
489        }
490        &FieldArgument::Void => {
491            dst.put_u8(b'V');
492        }
493        &FieldArgument::ByteArray(ref _array) => {
494            dst.put_u8(b'x');
495            panic!("Fail to parse ByteArray") // I don't know how should I treat it
496        }
497    }
498}
499// }}}