amq_proto/
framing.rs

1use error::*;
2use std::io::{Read, Write, Cursor};
3use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
4use enum_primitive::FromPrimitive;
5use method::EncodedMethod;
6
7enum_from_primitive! {
8#[derive(Debug, Clone, Eq, PartialEq)]
9pub enum FrameType {
10    METHOD = 1,
11    HEADERS = 2,
12    BODY  = 3,
13    HEARTBEAT = 8
14}
15}
16
17impl Copy for FrameType {}
18
19#[derive(Debug, PartialEq, Eq, Clone)]
20pub struct FramePayload(Vec<u8>);
21
22impl FramePayload {
23    pub fn new(data: Vec<u8>) -> Self {
24        FramePayload(data)
25    }
26
27    pub fn into_inner(self) -> Vec<u8> {
28        self.0
29    }
30
31    pub fn inner(&self) -> &[u8] {
32        &self.0
33    }
34}
35
36#[derive(Debug, PartialEq, Eq, Clone)]
37pub struct EncodedProperties(Vec<u8>);
38
39impl EncodedProperties {
40    pub fn new(data: Vec<u8>) -> Self {
41        EncodedProperties(data)
42    }
43
44    pub fn into_inner(self) -> Vec<u8> {
45        self.0
46    }
47
48    pub fn inner(&self) -> &[u8] {
49        &self.0
50    }
51}
52
53#[derive(Debug, Clone, Eq, PartialEq)]
54pub struct Frame {
55    pub frame_type: FrameType,
56    pub channel: u16,
57    pub payload: FramePayload,
58}
59
60pub struct FrameHeader {
61    pub frame_type_id: u8,
62    pub channel: u16,
63    pub payload_size: u32
64}
65
66impl FrameHeader {
67    pub fn new(header: [u8; 7]) -> Self {
68        let reader = &mut &header[..];
69        let frame_type_id = reader.read_u8().unwrap();
70        let channel = reader.read_u16::<BigEndian>().unwrap();
71        let payload_size = reader.read_u32::<BigEndian>().unwrap();
72        FrameHeader { frame_type_id: frame_type_id, channel: channel, payload_size: payload_size }
73    }
74}
75
76
77#[derive(Debug, Clone)]
78pub struct MethodFrame {
79    pub class_id: u16,
80    pub method_id: u16,
81    pub arguments: EncodedMethod
82}
83
84impl MethodFrame {
85    pub fn encode(&self) -> Result<FramePayload> {
86        let mut writer = Vec::with_capacity(self.arguments.inner().len() + 4);
87        try!(writer.write_u16::<BigEndian>(self.class_id));
88        try!(writer.write_u16::<BigEndian>(self.method_id));
89        try!(writer.write_all(self.arguments.inner()));
90        Ok(FramePayload::new(writer))
91    }
92
93    // We need this method, so we can match on class_id & method_id
94    pub fn decode(frame: &Frame) -> Result<MethodFrame> {
95        if frame.frame_type != FrameType::METHOD {
96            return Err(ErrorKind::Protocol("Not a method frame".to_string()).into())
97        }
98        let reader = &mut frame.payload.inner();
99        let class_id = try!(reader.read_u16::<BigEndian>());
100        let method_id = try!(reader.read_u16::<BigEndian>());
101        let mut arguments = vec![];
102        try!(reader.read_to_end(&mut arguments));
103        Ok(MethodFrame { class_id: class_id, method_id: method_id, arguments: EncodedMethod::new(arguments) })
104    }
105
106    pub fn method_name(&self) -> &'static str {
107        method_name(self)
108    }
109
110    pub fn carries_content(&self) -> bool {
111        method_carries_content(self)
112    }
113}
114include!("method_frame_methods.rs");
115
116
117unsafe impl Send for Frame {}
118
119impl Frame {
120    pub fn decode<T: Read>(reader: &mut T) -> Result<Frame> {
121        let mut header = [0u8; 7];
122        try!(reader.read_exact(&mut header));
123        let FrameHeader { frame_type_id, channel, payload_size } = FrameHeader::new(header);
124        let size = payload_size as usize;
125        // We need to use Vec because the size is not know in compile time.
126        let mut payload: Vec<u8> = vec![0u8; size];
127        try!(reader.read_exact(&mut payload));
128        let frame_end = try!(reader.read_u8());
129        if frame_end != 0xCE {
130            return Err(ErrorKind::Protocol("Frame didn't end with 0xCE".to_string()).into());
131        }
132        let frame_type = match FrameType::from_u8(frame_type_id) {
133            Some(ft) => ft,
134            None => return Err(ErrorKind::Protocol("Unknown frame type".to_string()).into()),
135        };
136
137        let frame = Frame {
138            frame_type: frame_type,
139            channel: channel,
140            payload: FramePayload::new(payload),
141        };
142        Ok(frame)
143    }
144
145    pub fn encode(&self) -> Result<Vec<u8>> {
146        let mut writer = Vec::with_capacity(self.payload.inner().len() + 8);
147        try!(writer.write_u8(self.frame_type as u8));
148        try!(writer.write_u16::<BigEndian>(self.channel));
149        try!(writer.write_u32::<BigEndian>(self.payload.inner().len() as u32));
150        try!(writer.write_all(self.payload.inner()));
151        try!(writer.write_u8(0xCE));
152        Ok(writer)
153    }
154}
155
156#[derive(Debug, Clone)]
157pub struct ContentHeaderFrame {
158    pub content_class: u16,
159    pub weight: u16,
160    pub body_size: u64,
161    pub properties_flags: u16,
162    pub properties: EncodedProperties,
163}
164
165impl ContentHeaderFrame {
166    pub fn decode(frame: &Frame) -> Result<ContentHeaderFrame> {
167        let mut reader = Cursor::new(frame.payload.inner());
168        let content_class = try!(reader.read_u16::<BigEndian>());
169        let weight = try!(reader.read_u16::<BigEndian>()); //0 all the time for now
170        let body_size = try!(reader.read_u64::<BigEndian>());
171        let properties_flags = try!(reader.read_u16::<BigEndian>());
172        let mut properties = vec![];
173        try!(reader.read_to_end(&mut properties));
174        Ok(ContentHeaderFrame {
175            content_class: content_class,
176            weight: weight,
177            body_size: body_size,
178            properties_flags: properties_flags,
179            properties: EncodedProperties::new(properties),
180        })
181    }
182
183    pub fn encode(&self) -> Result<Vec<u8>> {
184        let mut writer = Vec::with_capacity(self.properties.inner().len() + 14);;
185        try!(writer.write_u16::<BigEndian>(self.content_class));
186        try!(writer.write_u16::<BigEndian>(self.weight)); //0 all the time for now
187        try!(writer.write_u64::<BigEndian>(self.body_size));
188        try!(writer.write_u16::<BigEndian>(self.properties_flags));
189        try!(writer.write_all(self.properties.inner()));
190        Ok(writer)
191    }
192}
193
194#[test]
195fn test_encode_decode() {
196    let frame = Frame {
197        frame_type: FrameType::METHOD,
198        channel: 5,
199        payload: FramePayload::new(vec![1, 2, 3, 4, 5]),
200    };
201    let frame_encoded = frame.encode().ok().unwrap();
202    assert_eq!(frame,
203               Frame::decode(&mut Cursor::new(frame_encoded)).ok().unwrap());
204}