amqpr_codec/frame/
decoder.rs

1use bytes::{BytesMut, BigEndian, Buf};
2
3use std::io::{Cursor, Seek, SeekFrom};
4
5use frame::{Frame, FrameHeader, FrameType, FramePayload, FRAME_END_OCTET, method, content_body,
6            content_header};
7
8const FRAME_HEADER_BYTE_SIZE: usize = 7;
9
10pub fn decode_frame(src: &mut BytesMut) -> Option<Frame> {
11
12    debug!("Decode frame : {:?}", src);
13
14    match extract_frame_bytes(src) {
15        Some(mut frame_bytes) => {
16            debug!("Extracted a frame : {:?}", frame_bytes);
17
18            let (typ, channel, payload_size) =
19                decode_header(&mut frame_bytes.split_to(FRAME_HEADER_BYTE_SIZE));
20
21            debug!("frame type is {:?}", typ);
22            debug!("frame channel is {}", channel);
23            debug!("frame payload_size is {}", payload_size);
24
25            let payload = decode_payload(&typ, &mut frame_bytes.split_to(payload_size as usize));
26
27            let frame = Frame {
28                header: FrameHeader { channel: channel },
29                payload: payload,
30            };
31
32            debug!("Finish decoding frame : {:?}", frame);
33
34            Some(frame)
35        }
36        None => None,
37    }
38}
39
40
41/// Extract a frame bytes.
42/// If there is not enough length to make frame, this function returns None.
43/// If there is enough length, this function extract it after check frame end.
44///
45/// # Panics
46/// If frame end is invalid
47fn extract_frame_bytes(src: &mut BytesMut) -> Option<BytesMut> {
48    if src.len() < 8 {
49        None
50    } else {
51        let mut cursor = Cursor::new(src);
52        cursor.seek(SeekFrom::Current(3_i64)).expect("Never fail");
53        let size = cursor.get_u32::<BigEndian>() as usize;
54
55        let src = cursor.into_inner();
56
57        if src.len() >= size + 8 {
58            let bytes = src.split_to(size + 8);
59
60            // Check frame end
61            if !bytes.as_ref().ends_with(&[FRAME_END_OCTET]) {
62                panic!("Invalid Frame End");
63            }
64
65            Some(bytes)
66        } else {
67            None
68        }
69    }
70}
71
72
73/// Decode frame header. This function returns tuple of (type_octet, channel_id, body_size).
74///
75/// # Panics
76/// when `src` does not have enough length.
77fn decode_header(bytes: &mut BytesMut) -> (FrameType, u16, u32) {
78    let mut cursor = Cursor::new(bytes);
79    let typ = match cursor.get_u8() {
80        1 => FrameType::Method,
81        2 => FrameType::ContentHeader,
82        3 => FrameType::ContentBody,
83        4 | 8 => FrameType::Heartbeat, // RabbitMQ sends heartbeat frame starting with 8
84        b => unreachable!("Unexpected frame type '{}' is received", b),
85    };
86    let channel = cursor.get_u16::<BigEndian>();
87    let size = cursor.get_u32::<BigEndian>();
88
89    (typ, channel, size)
90}
91
92
93/// Decode frame payload with `FrameType`.
94/// You **MUTS** gime `Bytes` which has **EXACT* length of payload (without frame-end).
95///
96/// # Panics
97/// when `payload` does not have enough length.
98fn decode_payload(typ: &FrameType, bytes: &mut BytesMut) -> FramePayload {
99    use self::FrameType::*;
100    let payload = match *typ {
101        Method => FramePayload::Method(method::decoder::decode_payload(bytes)),
102        ContentHeader => FramePayload::ContentHeader(content_header::decode_payload(bytes)),
103        ContentBody => FramePayload::ContentBody(content_body::decode_payload(bytes)),
104        Heartbeat => FramePayload::Heartbeat,
105    };
106    payload
107}