rabbitmq_stream_protocol/message/amqp/
properties.rs

1use byteorder::{BigEndian, WriteBytesExt};
2
3use crate::message::amqp::codec::constants::TypeCode;
4use crate::message::amqp::codec::constants::MESSAGE_PROPERTIES;
5use crate::message::amqp::types::list_decoder;
6use crate::message::amqp::types::Binary;
7use crate::message::amqp::types::Descriptor;
8use crate::message::amqp::types::List;
9use crate::message::amqp::types::Str;
10use crate::message::amqp::AmqpDecodeError;
11
12use super::AmqpDecoder;
13use super::{
14    types::{Address, MessageId, SequenceNo, Symbol, Timestamp},
15    AmqpEncodeError, AmqpEncoder,
16};
17
18/// Properties of the message
19#[derive(Clone, Debug, PartialEq, Eq, Default)]
20#[cfg_attr(test, derive(fake::Dummy))]
21pub struct Properties {
22    pub message_id: Option<MessageId>,
23    pub user_id: Option<Vec<u8>>,
24    pub to: Option<Address>,
25    pub subject: Option<String>,
26    pub reply_to: Option<Address>,
27    pub correlation_id: Option<MessageId>,
28    pub content_type: Option<Symbol>,
29    pub content_encoding: Option<Symbol>,
30    pub absolute_expiry_time: Option<Timestamp>,
31    pub creation_time: Option<Timestamp>,
32    pub group_id: Option<String>,
33    pub group_sequence: Option<SequenceNo>,
34    pub reply_to_group_id: Option<String>,
35}
36
37impl Properties {
38    fn content_size(&self) -> u32 {
39        self.message_id.encoded_size()
40            + self.user_id.encoded_size()
41            + self.to.encoded_size()
42            + self.subject.encoded_size()
43            + self.reply_to.encoded_size()
44            + self.correlation_id.encoded_size()
45            + self.content_type.encoded_size()
46            + self.content_encoding.encoded_size()
47            + self.absolute_expiry_time.encoded_size()
48            + self.creation_time.encoded_size()
49            + self.group_id.encoded_size()
50            + self.group_sequence.encoded_size()
51            + self.reply_to_group_id.encoded_size()
52    }
53}
54impl AmqpEncoder for Properties {
55    fn encoded_size(&self) -> u32 {
56        let size = MESSAGE_PROPERTIES.encoded_size() + self.content_size();
57
58        let header = if size > u8::MAX as u32 { 9 } else { 3 };
59
60        header + size
61    }
62
63    fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> {
64        MESSAGE_PROPERTIES.encode(writer)?;
65
66        let content_size = self.content_size();
67
68        if content_size + 1 > u8::MAX as u32 {
69            TypeCode::List8.encode(writer)?;
70            writer.write_u32::<BigEndian>(content_size + 4)?;
71            writer.write_u32::<BigEndian>(13)?;
72        } else {
73            TypeCode::List8.encode(writer)?;
74            writer.write_u8((content_size + 1) as u8)?;
75            writer.write_u8(13)?;
76        }
77        self.message_id.encode(writer)?;
78        self.user_id.encode(writer)?;
79        self.to.encode(writer)?;
80        self.subject.encode(writer)?;
81        self.reply_to.encode(writer)?;
82        self.correlation_id.encode(writer)?;
83        self.content_type.encode(writer)?;
84        self.content_encoding.encode(writer)?;
85        self.absolute_expiry_time.encode(writer)?;
86        self.creation_time.encode(writer)?;
87        self.group_id.encode(writer)?;
88        self.group_sequence.encode(writer)?;
89        self.reply_to_group_id.encode(writer)?;
90
91        Ok(())
92    }
93}
94impl AmqpDecoder for Properties {
95    fn decode(input: &[u8]) -> Result<(&[u8], Self), super::AmqpDecodeError> {
96        match Descriptor::decode(input)? {
97            (input, MESSAGE_PROPERTIES) => {
98                let properties = Properties::default();
99                List::decode_with_fields(input, list_decoder_properties, properties)
100            }
101            (_, descriptor) => Err(AmqpDecodeError::MessageParse(format!(
102                "Invalid descriptor for properties {:?}",
103                descriptor
104            ))),
105        }
106    }
107}
108
109list_decoder!(Properties, list_decoder_properties,
110    {
111     0 => { message_id, MessageId, None, true},
112     1 => { user_id, Binary, None, true},
113     2 => { to, Address, None, true},
114     3 => { subject, Str, None, true},
115     4 => { reply_to, Address, None, true},
116     5 => { correlation_id, MessageId, None, true},
117     6 => { content_type, Symbol, None, true},
118     7 => { content_encoding, Symbol, None, true},
119     8 => { absolute_expiry_time, Timestamp, None, true},
120     9 => { creation_time, Timestamp, None, true},
121     10 => { group_id, Str, None, true},
122     11 => { group_sequence, SequenceNo, None, true},
123     12 => { reply_to_group_id, Str, None, true}
124    }
125);
126
127#[cfg(test)]
128mod tests {
129    use crate::message::amqp::tests::type_encode_decode_test_fuzzy;
130
131    use super::Properties;
132
133    #[test]
134    fn test_properties_encode_decode() {
135        type_encode_decode_test_fuzzy::<Properties>()
136    }
137}