rabbitmq_stream_protocol/message/amqp/
header.rs

1use byteorder::{BigEndian, WriteBytesExt};
2
3use crate::message::amqp::{
4    codec::constants::MESSAGE_HEADER,
5    types::{Descriptor, UInt},
6};
7
8use super::{
9    codec::constants::TypeCode,
10    types::Milliseconds,
11    types::{list_decoder, Boolean, List},
12    AmqpDecodeError, AmqpDecoder, AmqpEncodeError, AmqpEncoder,
13};
14/// Header of the message
15#[derive(Clone, Debug, PartialEq, Eq)]
16#[cfg_attr(test, derive(fake::Dummy))]
17pub struct Header {
18    pub durable: bool,
19    pub priority: u8,
20    pub ttl: Option<Milliseconds>,
21    pub first_acquirer: bool,
22    pub delivery_count: u32,
23}
24
25impl Default for Header {
26    fn default() -> Self {
27        Self {
28            durable: Default::default(),
29            priority: 4,
30            ttl: Default::default(),
31            first_acquirer: Default::default(),
32            delivery_count: 0,
33        }
34    }
35}
36
37impl Header {
38    fn content_size(&self) -> u32 {
39        self.durable.encoded_size()
40            + self.priority.encoded_size()
41            + self.ttl.encoded_size()
42            + self.first_acquirer.encoded_size()
43            + self.delivery_count.encoded_size()
44    }
45}
46
47impl AmqpEncoder for Header {
48    fn encoded_size(&self) -> u32 {
49        let size = self.content_size() + MESSAGE_HEADER.encoded_size();
50        let fixed = if size > u8::MAX as u32 { 9 } else { 3 };
51        fixed + size
52    }
53
54    fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> {
55        MESSAGE_HEADER.encode(writer)?;
56
57        let content_size = self.content_size();
58
59        if content_size + MESSAGE_HEADER.encoded_size() > u8::MAX as u32 {
60            TypeCode::List8.encode(writer)?;
61            writer.write_u32::<BigEndian>(content_size + 4)?;
62            writer.write_u32::<BigEndian>(5)?;
63        } else {
64            TypeCode::List8.encode(writer)?;
65            writer.write_u8((content_size + 4) as u8)?;
66            writer.write_u8(5)?;
67        }
68        self.durable.encode(writer)?;
69        self.priority.encode(writer)?;
70        self.ttl.encode(writer)?;
71        self.first_acquirer.encode(writer)?;
72        self.delivery_count.encode(writer)?;
73        Ok(())
74    }
75}
76
77impl AmqpDecoder for Header {
78    fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> {
79        match Descriptor::decode(input)? {
80            (input, MESSAGE_HEADER) => {
81                let header = Header::default();
82                List::decode_with_fields(input, list_decoder_header, header)
83            }
84            (_, descriptor) => Err(AmqpDecodeError::MessageParse(format!(
85                "Invalid descriptor for header {:?}",
86                descriptor
87            ))),
88        }
89    }
90}
91
92list_decoder!(Header, list_decoder_header,
93    {
94     0 => { durable, Boolean, false},
95     1 => { priority, u8, 4},
96     2 => { ttl, u32, None, true},
97     3 => { first_acquirer, Boolean, false},
98     4 => { delivery_count, UInt, 4}
99    }
100);
101
102#[cfg(test)]
103mod tests {
104    use crate::message::amqp::tests::type_encode_decode_test_fuzzy;
105
106    use super::Header;
107
108    #[test]
109    fn test_header_encode_decode() {
110        type_encode_decode_test_fuzzy::<Header>()
111    }
112}