rabbitmq_stream_protocol/message/amqp/
header.rs1use byteorder::{BigEndian, WriteBytesExt};
2
3use crate::message::amqp::{
4 codec::constants::MESSAGE_HEADER,
5 types::{Descriptor, UInt},
6};
7
8use super::{
9 codec::constants::TypeCode,
10 types::Milliseconds,
11 types::{list_decoder, Boolean, List},
12 AmqpDecodeError, AmqpDecoder, AmqpEncodeError, AmqpEncoder,
13};
14#[derive(Clone, Debug, PartialEq, Eq)]
16#[cfg_attr(test, derive(fake::Dummy))]
17pub struct Header {
18 pub durable: bool,
19 pub priority: u8,
20 pub ttl: Option<Milliseconds>,
21 pub first_acquirer: bool,
22 pub delivery_count: u32,
23}
24
25impl Default for Header {
26 fn default() -> Self {
27 Self {
28 durable: Default::default(),
29 priority: 4,
30 ttl: Default::default(),
31 first_acquirer: Default::default(),
32 delivery_count: 0,
33 }
34 }
35}
36
37impl Header {
38 fn content_size(&self) -> u32 {
39 self.durable.encoded_size()
40 + self.priority.encoded_size()
41 + self.ttl.encoded_size()
42 + self.first_acquirer.encoded_size()
43 + self.delivery_count.encoded_size()
44 }
45}
46
47impl AmqpEncoder for Header {
48 fn encoded_size(&self) -> u32 {
49 let size = self.content_size() + MESSAGE_HEADER.encoded_size();
50 let fixed = if size > u8::MAX as u32 { 9 } else { 3 };
51 fixed + size
52 }
53
54 fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> {
55 MESSAGE_HEADER.encode(writer)?;
56
57 let content_size = self.content_size();
58
59 if content_size + MESSAGE_HEADER.encoded_size() > u8::MAX as u32 {
60 TypeCode::List8.encode(writer)?;
61 writer.write_u32::<BigEndian>(content_size + 4)?;
62 writer.write_u32::<BigEndian>(5)?;
63 } else {
64 TypeCode::List8.encode(writer)?;
65 writer.write_u8((content_size + 4) as u8)?;
66 writer.write_u8(5)?;
67 }
68 self.durable.encode(writer)?;
69 self.priority.encode(writer)?;
70 self.ttl.encode(writer)?;
71 self.first_acquirer.encode(writer)?;
72 self.delivery_count.encode(writer)?;
73 Ok(())
74 }
75}
76
77impl AmqpDecoder for Header {
78 fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> {
79 match Descriptor::decode(input)? {
80 (input, MESSAGE_HEADER) => {
81 let header = Header::default();
82 List::decode_with_fields(input, list_decoder_header, header)
83 }
84 (_, descriptor) => Err(AmqpDecodeError::MessageParse(format!(
85 "Invalid descriptor for header {:?}",
86 descriptor
87 ))),
88 }
89 }
90}
91
92list_decoder!(Header, list_decoder_header,
93 {
94 0 => { durable, Boolean, false},
95 1 => { priority, u8, 4},
96 2 => { ttl, u32, None, true},
97 3 => { first_acquirer, Boolean, false},
98 4 => { delivery_count, UInt, 4}
99 }
100);
101
102#[cfg(test)]
103mod tests {
104 use crate::message::amqp::tests::type_encode_decode_test_fuzzy;
105
106 use super::Header;
107
108 #[test]
109 fn test_header_encode_decode() {
110 type_encode_decode_test_fuzzy::<Header>()
111 }
112}