use amqp_error::{AMQPResult, AMQPError};
use std::io::{Read, Write, Cursor};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use enum_primitive::FromPrimitive;
enum_from_primitive! {
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum FrameType {
METHOD = 1,
HEADERS = 2,
BODY = 3,
HEARTBEAT = 8
}
}
impl Copy for FrameType {}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Frame {
pub frame_type: FrameType,
pub channel: u16,
pub payload: Vec<u8>,
}
impl Frame {
pub fn decode<T: Read>(reader: &mut T) -> AMQPResult<Frame> {
let mut header = [0u8; 7];
let read_len = try!(reader.read(&mut header));
if read_len != 7 {
return Err(AMQPError::FramingError(format!("Error reading frame header. Expected \
to read 7 bytes, but read {}",
read_len)));
}
let header = &mut &header[..];
let frame_type_id = try!(header.read_u8());
let channel = try!(header.read_u16::<BigEndian>());
let size = try!(header.read_u32::<BigEndian>()) as usize;
let mut payload: Vec<u8> = vec![0u8; size];
let read_len = try!(reader.read(&mut payload));
if read_len != size {
return Err(AMQPError::FramingError(format!("Error reading frame body. Expected to \
read {} bytes, but read {}",
size,
read_len)));
}
let frame_end = try!(reader.read_u8());
if frame_end != 0xCE {
return Err(AMQPError::DecodeError("Frame didn't end with 0xCE"));
}
let frame_type = match FrameType::from_u8(frame_type_id) {
Some(ft) => ft,
None => return Err(AMQPError::DecodeError("Unknown frame type")),
};
let frame = Frame {
frame_type: frame_type,
channel: channel,
payload: payload,
};
Ok(frame)
}
pub fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.frame_type as u8));
try!(writer.write_u16::<BigEndian>(self.channel));
try!(writer.write_u32::<BigEndian>(self.payload.len() as u32));
try!(writer.write_all(&self.payload));
try!(writer.write_u8(0xCE));
Ok(writer)
}
}
#[derive(Debug, Clone)]
pub struct ContentHeaderFrame {
pub content_class: u16,
pub weight: u16,
pub body_size: u64,
pub properties_flags: u16,
pub properties: Vec<u8>,
}
impl ContentHeaderFrame {
pub fn decode(frame: Frame) -> AMQPResult<ContentHeaderFrame> {
let mut reader = Cursor::new(frame.payload);
let content_class = try!(reader.read_u16::<BigEndian>());
let weight = try!(reader.read_u16::<BigEndian>()); let body_size = try!(reader.read_u64::<BigEndian>());
let properties_flags = try!(reader.read_u16::<BigEndian>());
let mut properties = vec![];
try!(reader.read_to_end(&mut properties));
Ok(ContentHeaderFrame {
content_class: content_class,
weight: weight,
body_size: body_size,
properties_flags: properties_flags,
properties: properties,
})
}
pub fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.content_class));
try!(writer.write_u16::<BigEndian>(self.weight)); try!(writer.write_u64::<BigEndian>(self.body_size));
try!(writer.write_u16::<BigEndian>(self.properties_flags));
try!(writer.write_all(&self.properties));
Ok(writer)
}
}
#[test]
fn test_encode_decode() {
let frame = Frame {
frame_type: FrameType::METHOD,
channel: 5,
payload: vec![1, 2, 3, 4, 5],
};
let frame_encoded = frame.encode().ok().unwrap();
assert_eq!(frame,
Frame::decode(&mut Cursor::new(frame_encoded)).ok().unwrap());
}