use bytes::{BytesMut, BigEndian, Buf};
use std::io::Cursor;
use std::collections::HashMap;
use frame::method::{MethodPayload, ConnectionClass, ChannelClass, ExchangeClass, QueueClass,
BasicClass, TxClass};
use args::*;
pub fn decode_payload(bytes: &mut BytesMut) -> MethodPayload {
let mut cursor = Cursor::new(bytes.split_off(4).freeze());
let class_id = cursor.get_u16::<BigEndian>();
debug!("class_id is {}", class_id);
let method_id = cursor.get_u16::<BigEndian>();
debug!("method_id is {}", method_id);
drop(cursor);
match class_id {
10 => MethodPayload::Connection(decode_connection_class(method_id, bytes)),
20 => MethodPayload::Channel(decode_channel_class(method_id, bytes)),
40 => MethodPayload::Exchange(decode_exchange_class(method_id, bytes)),
50 => MethodPayload::Queue(decode_queue_class(method_id, bytes)),
60 => MethodPayload::Basic(decode_basic_class(method_id, bytes)),
90 => MethodPayload::Tx(decode_tx_class(method_id, bytes)),
c => unreachable!("Unexpected class id {}", c),
}
}
fn decode_connection_class(method_id: u16, bytes: &mut BytesMut) -> ConnectionClass {
use frame::method::connection::*;
use self::ConnectionClass::*;
match method_id {
10 => Start(StartMethod {
version_major: decode_octet(bytes),
version_minor: decode_octet(bytes),
server_properties: decode_field_table(bytes),
mechanisms: decode_long_str(bytes),
locales: decode_long_str(bytes),
}),
20 => Secure(SecureMethod { challenge: decode_long_str(bytes) }),
30 => Tune(TuneMethod {
channel_max: decode_short(bytes),
frame_max: decode_long(bytes),
heartbeat: decode_short(bytes),
}),
41 => OpenOk(OpenOkMethod { reserved1: decode_short_str(bytes) }),
50 => Close(CloseMethod {
reply_code: decode_short(bytes),
reply_text: decode_short_str(bytes),
class_id: decode_short(bytes),
method_id: decode_short(bytes),
}),
51 => CloseOk,
60 => Blocked(BlockedMethod { reason: decode_short_str(bytes) }),
61 => Unblocked,
m => unreachable!("Unexpected method id {} in Connection class", m),
}
}
fn decode_channel_class(method_id: u16, bytes: &mut BytesMut) -> ChannelClass {
use frame::method::channel::*;
use self::ChannelClass::*;
match method_id {
11 => OpenOk(OpenOkMethod { reserved1: decode_long_str(bytes) }),
20 => Flow(FlowMethod { active: decode_bool_1(bytes) }),
21 => FlowOk(FlowOkMethod { active: decode_bool_1(bytes) }),
40 => Close(CloseMethod {
reply_code: decode_short(bytes),
reply_text: decode_short_str(bytes),
class_id: decode_short(bytes),
method_id: decode_short(bytes),
}),
41 => CloseOk,
m => unreachable!("Unexpected method id {} in Channel class", m),
}
}
fn decode_exchange_class(method_id: u16, _payload: &mut BytesMut) -> ExchangeClass {
use self::ExchangeClass::*;
match method_id {
11 => DeclareOk,
21 => DeleteOk,
31 => BindOk, 51 => UnbindOk, m => unreachable!("Unexpected method id {} in Channel class", m),
}
}
fn decode_queue_class(method_id: u16, bytes: &mut BytesMut) -> QueueClass {
use frame::method::queue::*;
use self::QueueClass::*;
match method_id {
11 => DeclareOk(DeclareOkMethod {
queue: decode_short_str(bytes),
message_count: decode_long(bytes),
consumer_count: decode_long(bytes),
}),
21 => BindOk,
31 => PurgeOk(PurgeOkMethod { message_count: decode_long(bytes) }),
41 => DeleteOk(DeleteOkMethod { message_count: decode_long(bytes) }),
51 => UnbindOk,
m => unreachable!("Unexpected method id {} in Channel class", m),
}
}
fn decode_basic_class(method_id: u16, bytes: &mut BytesMut) -> BasicClass {
use frame::method::basic::*;
use self::BasicClass::*;
match method_id {
11 => QosOk,
21 => ConsumeOk(ConsumeOkMethod { consumer_tag: decode_short_str(bytes) }),
31 => CancelOk(CancelOkMethod { consumer_tag: decode_short_str(bytes) }),
50 => Return(ReturnMethod {
reply_code: decode_short(bytes),
reply_text: decode_short_str(bytes),
exchange: decode_short_str(bytes),
routing_key: decode_short_str(bytes),
}),
60 => Deliver(DeliverMethod {
consumer_tag: decode_short_str(bytes),
delivery_tag: decode_longlong(bytes),
redeliverd: decode_bool_1(bytes),
exchange: decode_short_str(bytes),
routing_key: decode_short_str(bytes),
}),
71 => GetOk(GetOkMethod {
delivery_tag: decode_longlong(bytes),
redeliverd: decode_bool_1(bytes),
exchange: decode_short_str(bytes),
routing_key: decode_short_str(bytes),
message_count: decode_long(bytes),
}),
72 => GetEmpty(GetEmptyMethod { reserved1: decode_short_str(bytes) }),
80 => Ack(AckMethod {
delivery_tag: decode_longlong(bytes),
multiple: decode_bool_1(bytes),
}),
120 => Nack(NackMethod {
delivery_tag: decode_longlong(bytes),
multiple: decode_bool_1(bytes),
}),
m => unreachable!("Unexpected method id {} in Channel class", m),
}
}
fn decode_tx_class(method_id: u16, _bytes: &mut BytesMut) -> TxClass {
use self::TxClass::*;
match method_id {
11 => SelectOk,
21 => CommitOk,
31 => RollbackOk,
m => unreachable!("Unexpected method id {} in Connection class", m),
}
}
fn decode_bool_1(bytes: &mut BytesMut) -> bool {
bytes.split_to(1)[0] & 0b_1000_0000 == 0b_1000_0000
}
fn decode_octet(bytes: &mut BytesMut) -> u8 {
bytes.split_to(1)[0]
}
fn decode_short(bytes: &mut BytesMut) -> u16 {
Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>()
}
fn decode_long(bytes: &mut BytesMut) -> u32 {
Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>()
}
fn decode_longlong(bytes: &mut BytesMut) -> u64 {
Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>()
}
fn decode_short_str(bytes: &mut BytesMut) -> AmqpString {
let len = bytes.split_to(1)[0];
AmqpString(bytes.split_to(len as usize).freeze())
}
fn decode_long_str(bytes: &mut BytesMut) -> AmqpString {
let len = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>();
AmqpString(bytes.split_to(len as usize).freeze())
}
pub(crate) fn decode_field_table(bytes: &mut BytesMut) -> HashMap<AmqpString, FieldArgument> {
debug!("decode field table");
let size = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>() as u64;
let mut bytes = bytes.split_to(size as usize);
let mut table = HashMap::new();
while bytes.len() > 0 {
let item_name = decode_short_str(&mut bytes);
let item_value = decode_field_item_value(&mut bytes);
table.insert(item_name, item_value);
}
table
}
fn decode_field_item_value(bytes: &mut BytesMut) -> FieldArgument {
let flag = bytes.split_to(1)[0];
match flag {
0x74 => FieldArgument::Boolean(bytes.split_to(1)[0] == 0x01),
0x62 => FieldArgument::SignedOctet(Cursor::new(bytes.split_to(1)).get_i8()),
0x42 => FieldArgument::UnsignedOctet(bytes.split_to(1)[0]),
0x55 => FieldArgument::SignedShort(Cursor::new(bytes.split_to(2)).get_i16::<BigEndian>()),
0x75 => FieldArgument::UnsignedShort(Cursor::new(bytes.split_to(2)).get_u16::<BigEndian>()),
0x49 => FieldArgument::SignedLong(Cursor::new(bytes.split_to(4)).get_i32::<BigEndian>()),
0x69 => FieldArgument::UnsignedLong(Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>()),
0x4C => FieldArgument::SignedLongLong(
Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>(),
),
0x6C => FieldArgument::UnsignedLongLong(
Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>(),
),
0x66 => FieldArgument::Float(Cursor::new(bytes.split_to(4)).get_f32::<BigEndian>()),
0x63 => FieldArgument::Double(Cursor::new(bytes.split_to(8)).get_f64::<BigEndian>()),
0x44 => FieldArgument::Decimal(Cursor::new(bytes.split_to(8)).get_i64::<BigEndian>()),
0x73 => {
let len = bytes.split_to(1)[0];
FieldArgument::ShortString(AmqpString(bytes.split_to(len as usize).freeze()))
}
0x53 => {
let len = Cursor::new(bytes.split_to(4)).get_u32::<BigEndian>();
FieldArgument::LongString(AmqpString(bytes.split_to(len as usize).freeze()))
}
0x54 => FieldArgument::Timestamp(Cursor::new(bytes.split_to(8)).get_u64::<BigEndian>()),
0x46 => FieldArgument::NestedTable(decode_field_table(bytes)),
0x56 => FieldArgument::Void,
0x78 => panic!(), b => unreachable!("Unexpected byte {} at decode_field_item_value", b),
}
}