amqpr_codec/frame/
decoder.rs1use bytes::{BytesMut, BigEndian, Buf};
2
3use std::io::{Cursor, Seek, SeekFrom};
4
5use frame::{Frame, FrameHeader, FrameType, FramePayload, FRAME_END_OCTET, method, content_body,
6 content_header};
7
8const FRAME_HEADER_BYTE_SIZE: usize = 7;
9
10pub fn decode_frame(src: &mut BytesMut) -> Option<Frame> {
11
12 debug!("Decode frame : {:?}", src);
13
14 match extract_frame_bytes(src) {
15 Some(mut frame_bytes) => {
16 debug!("Extracted a frame : {:?}", frame_bytes);
17
18 let (typ, channel, payload_size) =
19 decode_header(&mut frame_bytes.split_to(FRAME_HEADER_BYTE_SIZE));
20
21 debug!("frame type is {:?}", typ);
22 debug!("frame channel is {}", channel);
23 debug!("frame payload_size is {}", payload_size);
24
25 let payload = decode_payload(&typ, &mut frame_bytes.split_to(payload_size as usize));
26
27 let frame = Frame {
28 header: FrameHeader { channel: channel },
29 payload: payload,
30 };
31
32 debug!("Finish decoding frame : {:?}", frame);
33
34 Some(frame)
35 }
36 None => None,
37 }
38}
39
40
41fn extract_frame_bytes(src: &mut BytesMut) -> Option<BytesMut> {
48 if src.len() < 8 {
49 None
50 } else {
51 let mut cursor = Cursor::new(src);
52 cursor.seek(SeekFrom::Current(3_i64)).expect("Never fail");
53 let size = cursor.get_u32::<BigEndian>() as usize;
54
55 let src = cursor.into_inner();
56
57 if src.len() >= size + 8 {
58 let bytes = src.split_to(size + 8);
59
60 if !bytes.as_ref().ends_with(&[FRAME_END_OCTET]) {
62 panic!("Invalid Frame End");
63 }
64
65 Some(bytes)
66 } else {
67 None
68 }
69 }
70}
71
72
73fn decode_header(bytes: &mut BytesMut) -> (FrameType, u16, u32) {
78 let mut cursor = Cursor::new(bytes);
79 let typ = match cursor.get_u8() {
80 1 => FrameType::Method,
81 2 => FrameType::ContentHeader,
82 3 => FrameType::ContentBody,
83 4 | 8 => FrameType::Heartbeat, b => unreachable!("Unexpected frame type '{}' is received", b),
85 };
86 let channel = cursor.get_u16::<BigEndian>();
87 let size = cursor.get_u32::<BigEndian>();
88
89 (typ, channel, size)
90}
91
92
93fn decode_payload(typ: &FrameType, bytes: &mut BytesMut) -> FramePayload {
99 use self::FrameType::*;
100 let payload = match *typ {
101 Method => FramePayload::Method(method::decoder::decode_payload(bytes)),
102 ContentHeader => FramePayload::ContentHeader(content_header::decode_payload(bytes)),
103 ContentBody => FramePayload::ContentBody(content_body::decode_payload(bytes)),
104 Heartbeat => FramePayload::Heartbeat,
105 };
106 payload
107}