rabbitmq_stream_protocol/message/amqp/
properties.rs1use byteorder::{BigEndian, WriteBytesExt};
2
3use crate::message::amqp::codec::constants::TypeCode;
4use crate::message::amqp::codec::constants::MESSAGE_PROPERTIES;
5use crate::message::amqp::types::list_decoder;
6use crate::message::amqp::types::Binary;
7use crate::message::amqp::types::Descriptor;
8use crate::message::amqp::types::List;
9use crate::message::amqp::types::Str;
10use crate::message::amqp::AmqpDecodeError;
11
12use super::AmqpDecoder;
13use super::{
14 types::{Address, MessageId, SequenceNo, Symbol, Timestamp},
15 AmqpEncodeError, AmqpEncoder,
16};
17
18#[derive(Clone, Debug, PartialEq, Eq, Default)]
20#[cfg_attr(test, derive(fake::Dummy))]
21pub struct Properties {
22 pub message_id: Option<MessageId>,
23 pub user_id: Option<Vec<u8>>,
24 pub to: Option<Address>,
25 pub subject: Option<String>,
26 pub reply_to: Option<Address>,
27 pub correlation_id: Option<MessageId>,
28 pub content_type: Option<Symbol>,
29 pub content_encoding: Option<Symbol>,
30 pub absolute_expiry_time: Option<Timestamp>,
31 pub creation_time: Option<Timestamp>,
32 pub group_id: Option<String>,
33 pub group_sequence: Option<SequenceNo>,
34 pub reply_to_group_id: Option<String>,
35}
36
37impl Properties {
38 fn content_size(&self) -> u32 {
39 self.message_id.encoded_size()
40 + self.user_id.encoded_size()
41 + self.to.encoded_size()
42 + self.subject.encoded_size()
43 + self.reply_to.encoded_size()
44 + self.correlation_id.encoded_size()
45 + self.content_type.encoded_size()
46 + self.content_encoding.encoded_size()
47 + self.absolute_expiry_time.encoded_size()
48 + self.creation_time.encoded_size()
49 + self.group_id.encoded_size()
50 + self.group_sequence.encoded_size()
51 + self.reply_to_group_id.encoded_size()
52 }
53}
54impl AmqpEncoder for Properties {
55 fn encoded_size(&self) -> u32 {
56 let size = MESSAGE_PROPERTIES.encoded_size() + self.content_size();
57
58 let header = if size > u8::MAX as u32 { 9 } else { 3 };
59
60 header + size
61 }
62
63 fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> {
64 MESSAGE_PROPERTIES.encode(writer)?;
65
66 let content_size = self.content_size();
67
68 if content_size + 1 > u8::MAX as u32 {
69 TypeCode::List8.encode(writer)?;
70 writer.write_u32::<BigEndian>(content_size + 4)?;
71 writer.write_u32::<BigEndian>(13)?;
72 } else {
73 TypeCode::List8.encode(writer)?;
74 writer.write_u8((content_size + 1) as u8)?;
75 writer.write_u8(13)?;
76 }
77 self.message_id.encode(writer)?;
78 self.user_id.encode(writer)?;
79 self.to.encode(writer)?;
80 self.subject.encode(writer)?;
81 self.reply_to.encode(writer)?;
82 self.correlation_id.encode(writer)?;
83 self.content_type.encode(writer)?;
84 self.content_encoding.encode(writer)?;
85 self.absolute_expiry_time.encode(writer)?;
86 self.creation_time.encode(writer)?;
87 self.group_id.encode(writer)?;
88 self.group_sequence.encode(writer)?;
89 self.reply_to_group_id.encode(writer)?;
90
91 Ok(())
92 }
93}
94impl AmqpDecoder for Properties {
95 fn decode(input: &[u8]) -> Result<(&[u8], Self), super::AmqpDecodeError> {
96 match Descriptor::decode(input)? {
97 (input, MESSAGE_PROPERTIES) => {
98 let properties = Properties::default();
99 List::decode_with_fields(input, list_decoder_properties, properties)
100 }
101 (_, descriptor) => Err(AmqpDecodeError::MessageParse(format!(
102 "Invalid descriptor for properties {:?}",
103 descriptor
104 ))),
105 }
106 }
107}
108
109list_decoder!(Properties, list_decoder_properties,
110 {
111 0 => { message_id, MessageId, None, true},
112 1 => { user_id, Binary, None, true},
113 2 => { to, Address, None, true},
114 3 => { subject, Str, None, true},
115 4 => { reply_to, Address, None, true},
116 5 => { correlation_id, MessageId, None, true},
117 6 => { content_type, Symbol, None, true},
118 7 => { content_encoding, Symbol, None, true},
119 8 => { absolute_expiry_time, Timestamp, None, true},
120 9 => { creation_time, Timestamp, None, true},
121 10 => { group_id, Str, None, true},
122 11 => { group_sequence, SequenceNo, None, true},
123 12 => { reply_to_group_id, Str, None, true}
124 }
125);
126
127#[cfg(test)]
128mod tests {
129 use crate::message::amqp::tests::type_encode_decode_test_fuzzy;
130
131 use super::Properties;
132
133 #[test]
134 fn test_properties_encode_decode() {
135 type_encode_decode_test_fuzzy::<Properties>()
136 }
137}