amqpr_codec/frame/
encoder.rs

1use bytes::{BufMut, BytesMut, BigEndian};
2
3use frame::*;
4
5
6const METHOD_TYPE_BYTE: u8 = 1;
7const CONTENT_HEADER_TYPE_BYTE: u8 = 2;
8const CONTENT_BODY_TYPE_BYTE: u8 = 3;
9const HEARTBEAT_TYPE_BYTE: u8 = 8;
10
11
12pub fn encode_frame(item: Frame, dst: &mut BytesMut) {
13    debug!("Start encode frame : {:?}", item);
14
15    match item.payload {
16        FramePayload::Method(payload) => {
17            let payload = method::encoder::encode_payload(payload);
18            encode_frame_inner(METHOD_TYPE_BYTE, item.header.channel, payload.as_ref(), dst);
19        }
20        FramePayload::ContentHeader(payload) => {
21            let payload = content_header::encode_payload(payload);
22            encode_frame_inner(
23                CONTENT_HEADER_TYPE_BYTE,
24                item.header.channel,
25                payload.as_ref(),
26                dst,
27            );
28        }
29        FramePayload::ContentBody(payload) => {
30            let payload = content_body::encoder::encode_payload(payload);
31            encode_frame_inner(
32                CONTENT_BODY_TYPE_BYTE,
33                item.header.channel,
34                payload.as_ref(),
35                dst,
36            );
37        }
38        FramePayload::Heartbeat => {
39            encode_frame_inner(HEARTBEAT_TYPE_BYTE, item.header.channel, &[], dst);
40        }
41    };
42}
43
44
45fn encode_frame_inner(type_byte: u8, channel: u16, encoded_payload: &[u8], dst: &mut BytesMut) {
46    // put headers
47    dst.reserve(7);
48    dst.put_u8(type_byte);
49    dst.put_u16::<BigEndian>(channel);
50    dst.put_u32::<BigEndian>(encoded_payload.len() as u32);
51    debug!("byte size of payload is {}", encoded_payload.len());
52
53    // put payload
54    dst.extend_from_slice(encoded_payload);
55
56    // put frame-end
57    dst.reserve(1);
58    dst.put_u8(FRAME_END_OCTET);
59}