amqp_codec/message/
inmessage.rs1use std::cell::Cell;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use fxhash::FxHashMap;
5
6use crate::codec::{Decode, Encode, FORMATCODE_BINARY8};
7use crate::errors::AmqpParseError;
8use crate::protocol::{
9 Annotations, Header, MessageFormat, Properties, Section, StringVariantMap, TransferBody,
10};
11use crate::types::{Descriptor, Str, Variant};
12
13use super::body::MessageBody;
14use super::outmessage::OutMessage;
15use super::SECTION_PREFIX_LENGTH;
16
17#[derive(Debug, Clone, Default, PartialEq)]
18pub struct InMessage {
19 pub message_format: Option<MessageFormat>,
20 pub(super) header: Option<Header>,
21 pub(super) delivery_annotations: Option<Annotations>,
22 pub(super) message_annotations: Option<Annotations>,
23 pub(super) properties: Option<Properties>,
24 pub(super) application_properties: Option<StringVariantMap>,
25 pub(super) footer: Option<Annotations>,
26 pub(super) body: MessageBody,
27 pub(super) size: Cell<usize>,
28}
29
30impl InMessage {
31 pub fn with_body(body: Bytes) -> InMessage {
33 let mut msg = InMessage::default();
34 msg.body.data.push(body);
35 msg
36 }
37
38 pub fn with_messages(messages: Vec<TransferBody>) -> InMessage {
40 let mut msg = InMessage::default();
41 msg.body.messages = messages;
42 msg
43 }
44
45 pub fn header(&self) -> Option<&Header> {
47 self.header.as_ref()
48 }
49
50 pub fn set_header(mut self, header: Header) -> Self {
52 self.header = Some(header);
53 self.size.set(0);
54 self
55 }
56
57 pub fn properties(&self) -> Option<&Properties> {
59 self.properties.as_ref()
60 }
61
62 pub fn set_properties<F>(mut self, f: F) -> Self
64 where
65 F: Fn(&mut Properties),
66 {
67 if let Some(ref mut props) = self.properties {
68 f(props);
69 } else {
70 let mut props = Properties::default();
71 f(&mut props);
72 self.properties = Some(props);
73 }
74 self.size.set(0);
75 self
76 }
77
78 pub fn app_property(&self, key: &str) -> Option<&Variant> {
80 if let Some(ref props) = self.application_properties {
81 props.get(key)
82 } else {
83 None
84 }
85 }
86
87 pub fn app_properties(&self) -> Option<&StringVariantMap> {
89 self.application_properties.as_ref()
90 }
91
92 pub fn message_annotation(&self, key: &str) -> Option<&Variant> {
94 if let Some(ref props) = self.message_annotations {
95 props.get(key)
96 } else {
97 None
98 }
99 }
100
101 pub fn set_app_property<K: Into<Str>, V: Into<Variant>>(mut self, key: K, value: V) -> Self {
103 if let Some(ref mut props) = self.application_properties {
104 props.insert(key.into(), value.into());
105 } else {
106 let mut props = FxHashMap::default();
107 props.insert(key.into(), value.into());
108 self.application_properties = Some(props);
109 }
110 self.size.set(0);
111 self
112 }
113
114 pub fn update<F>(self, f: F) -> Self
116 where
117 F: Fn(Self) -> Self,
118 {
119 self.size.set(0);
120 f(self)
121 }
122
123 pub fn if_some<T, F>(self, value: &Option<T>, f: F) -> Self
125 where
126 F: Fn(Self, &T) -> Self,
127 {
128 if let Some(ref val) = value {
129 self.size.set(0);
130 f(self, val)
131 } else {
132 self
133 }
134 }
135
136 pub fn body(&self) -> &MessageBody {
138 &self.body
139 }
140
141 pub fn value(&self) -> Option<&Variant> {
143 self.body.value.as_ref()
144 }
145
146 pub fn set_value<V: Into<Variant>>(mut self, v: V) -> Self {
148 self.body.value = Some(v.into());
149 self
150 }
151
152 pub fn set_body<F>(mut self, f: F) -> Self
154 where
155 F: Fn(&mut MessageBody),
156 {
157 f(&mut self.body);
158 self.size.set(0);
159 self
160 }
161
162 pub fn reply_message(&self) -> OutMessage {
164 let mut msg = OutMessage::default().if_some(&self.properties, |mut msg, data| {
165 msg.set_properties(|props| props.correlation_id = data.message_id.clone());
166 msg
167 });
168 msg.message_format = self.message_format;
169 msg
170 }
171}
172
173impl Decode for InMessage {
174 fn decode(mut input: &[u8]) -> Result<(&[u8], InMessage), AmqpParseError> {
175 let mut message = InMessage::default();
176
177 loop {
178 let (buf, sec) = Section::decode(input)?;
179 match sec {
180 Section::Header(val) => {
181 message.header = Some(val);
182 }
183 Section::DeliveryAnnotations(val) => {
184 message.delivery_annotations = Some(val);
185 }
186 Section::MessageAnnotations(val) => {
187 message.message_annotations = Some(val);
188 }
189 Section::ApplicationProperties(val) => {
190 message.application_properties = Some(val);
191 }
192 Section::Footer(val) => {
193 message.footer = Some(val);
194 }
195 Section::Properties(val) => {
196 message.properties = Some(val);
197 }
198
199 Section::AmqpSequence(val) => {
201 message.body.sequence.push(val);
202 }
203 Section::AmqpValue(val) => {
204 message.body.value = Some(val);
205 }
206 Section::Data(val) => {
207 message.body.data.push(val);
208 }
209 }
210 if buf.is_empty() {
211 break;
212 }
213 input = buf;
214 }
215 Ok((input, message))
216 }
217}
218
219impl Encode for InMessage {
220 fn encoded_size(&self) -> usize {
221 let size = self.size.get();
222 if size != 0 {
223 return size;
224 }
225
226 let body_size = self.body.encoded_size();
228 let mut size = if body_size == 0 {
229 SECTION_PREFIX_LENGTH + 2
231 } else {
232 body_size
233 };
234
235 if let Some(ref h) = self.header {
236 size += h.encoded_size() + SECTION_PREFIX_LENGTH;
237 }
238 if let Some(ref da) = self.delivery_annotations {
239 size += da.encoded_size() + SECTION_PREFIX_LENGTH;
240 }
241 if let Some(ref ma) = self.message_annotations {
242 size += ma.encoded_size() + SECTION_PREFIX_LENGTH;
243 }
244 if let Some(ref p) = self.properties {
245 size += p.encoded_size();
246 }
247 if let Some(ref ap) = self.application_properties {
248 size += ap.encoded_size() + SECTION_PREFIX_LENGTH;
249 }
250 if let Some(ref f) = self.footer {
251 size += f.encoded_size() + SECTION_PREFIX_LENGTH;
252 }
253 self.size.set(size);
254 size
255 }
256
257 fn encode(&self, dst: &mut BytesMut) {
258 if let Some(ref h) = self.header {
259 h.encode(dst);
260 }
261 if let Some(ref da) = self.delivery_annotations {
262 Descriptor::Ulong(113).encode(dst);
263 da.encode(dst);
264 }
265 if let Some(ref ma) = self.message_annotations {
266 Descriptor::Ulong(114).encode(dst);
267 ma.encode(dst);
268 }
269 if let Some(ref p) = self.properties {
270 p.encode(dst);
271 }
272 if let Some(ref ap) = self.application_properties {
273 Descriptor::Ulong(116).encode(dst);
274 ap.encode(dst);
275 }
276
277 if self.body.encoded_size() == 0 {
279 Descriptor::Ulong(117).encode(dst);
281 dst.put_u8(FORMATCODE_BINARY8);
282 dst.put_u8(0);
283 } else {
284 self.body.encode(dst);
285 }
286
287 if let Some(ref f) = self.footer {
289 Descriptor::Ulong(120).encode(dst);
290 f.encode(dst);
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use bytes::{Bytes, BytesMut};
298 use bytestring::ByteString;
299
300 use crate::codec::{Decode, Encode};
301 use crate::errors::AmqpCodecError;
302 use crate::protocol::Header;
303 use crate::types::Variant;
304
305 use super::InMessage;
306
307 #[test]
308 fn test_properties() -> Result<(), AmqpCodecError> {
309 let msg =
310 InMessage::with_body(Bytes::from_static(b"Hello world")).set_properties(|props| {
311 props.message_id = Some(Bytes::from_static(b"msg1").into());
312 props.content_type = Some("text".to_string().into());
313 props.correlation_id = Some(Bytes::from_static(b"no1").into());
314 props.content_encoding = Some("utf8+1".to_string().into());
315 });
316
317 let mut buf = BytesMut::with_capacity(msg.encoded_size());
318 msg.encode(&mut buf);
319
320 let msg2 = InMessage::decode(&buf)?.1;
321 let props = msg2.properties.as_ref().unwrap();
322 assert_eq!(props.message_id, Some(Bytes::from_static(b"msg1").into()));
323 assert_eq!(
324 props.correlation_id,
325 Some(Bytes::from_static(b"no1").into())
326 );
327 Ok(())
328 }
329
330 #[test]
331 fn test_app_properties() -> Result<(), AmqpCodecError> {
332 let msg = InMessage::default().set_app_property(ByteString::from("test"), 1);
333
334 let mut buf = BytesMut::with_capacity(msg.encoded_size());
335 msg.encode(&mut buf);
336
337 let msg2 = InMessage::decode(&buf)?.1;
338 let props = msg2.application_properties.as_ref().unwrap();
339 assert_eq!(*props.get("test").unwrap(), Variant::from(1));
340 Ok(())
341 }
342
343 #[test]
344 fn test_header() -> Result<(), AmqpCodecError> {
345 let hdr = Header {
346 durable: false,
347 priority: 1,
348 ttl: None,
349 first_acquirer: false,
350 delivery_count: 1,
351 };
352
353 let msg = InMessage::default().set_header(hdr.clone());
354 let mut buf = BytesMut::with_capacity(msg.encoded_size());
355 msg.encode(&mut buf);
356
357 let msg2 = InMessage::decode(&buf)?.1;
358 assert_eq!(msg2.header().unwrap(), &hdr);
359 Ok(())
360 }
361
362 #[test]
363 fn test_data() -> Result<(), AmqpCodecError> {
364 let data = Bytes::from_static(b"test data");
365
366 let msg = InMessage::default().set_body(|body| body.set_data(data.clone()));
367 let mut buf = BytesMut::with_capacity(msg.encoded_size());
368 msg.encode(&mut buf);
369
370 let msg2 = InMessage::decode(&buf)?.1;
371 assert_eq!(msg2.body.data().unwrap(), &data);
372 Ok(())
373 }
374
375 #[test]
376 fn test_data_empty() -> Result<(), AmqpCodecError> {
377 let msg = InMessage::default();
378 let mut buf = BytesMut::with_capacity(msg.encoded_size());
379 msg.encode(&mut buf);
380
381 let msg2 = InMessage::decode(&buf)?.1;
382 assert_eq!(msg2.body.data().unwrap(), &Bytes::from_static(b""));
383 Ok(())
384 }
385
386 #[test]
387 fn test_messages() -> Result<(), AmqpCodecError> {
388 let msg1 = InMessage::default().set_properties(|props| props.message_id = Some(1.into()));
389 let msg2 = InMessage::default().set_properties(|props| props.message_id = Some(2.into()));
390
391 let msg = InMessage::default().set_body(|body| {
392 body.messages.push(msg1.clone().into());
393 body.messages.push(msg2.clone().into());
394 });
395 let mut buf = BytesMut::with_capacity(msg.encoded_size());
396 msg.encode(&mut buf);
397
398 let msg3 = InMessage::decode(&buf)?.1;
399 let msg4 = InMessage::decode(&msg3.body.data().unwrap())?.1;
400 assert_eq!(msg1.properties, msg4.properties);
401
402 let msg5 = InMessage::decode(&msg3.body.data[1])?.1;
403 assert_eq!(msg2.properties, msg5.properties);
404 Ok(())
405 }
406}