amqp_codec/message/
inmessage.rs

1use std::cell::Cell;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use fxhash::FxHashMap;
5
6use crate::codec::{Decode, Encode, FORMATCODE_BINARY8};
7use crate::errors::AmqpParseError;
8use crate::protocol::{
9    Annotations, Header, MessageFormat, Properties, Section, StringVariantMap, TransferBody,
10};
11use crate::types::{Descriptor, Str, Variant};
12
13use super::body::MessageBody;
14use super::outmessage::OutMessage;
15use super::SECTION_PREFIX_LENGTH;
16
17#[derive(Debug, Clone, Default, PartialEq)]
18pub struct InMessage {
19    pub message_format: Option<MessageFormat>,
20    pub(super) header: Option<Header>,
21    pub(super) delivery_annotations: Option<Annotations>,
22    pub(super) message_annotations: Option<Annotations>,
23    pub(super) properties: Option<Properties>,
24    pub(super) application_properties: Option<StringVariantMap>,
25    pub(super) footer: Option<Annotations>,
26    pub(super) body: MessageBody,
27    pub(super) size: Cell<usize>,
28}
29
30impl InMessage {
31    /// Create new message and set body
32    pub fn with_body(body: Bytes) -> InMessage {
33        let mut msg = InMessage::default();
34        msg.body.data.push(body);
35        msg
36    }
37
38    /// Create new message and set messages as body
39    pub fn with_messages(messages: Vec<TransferBody>) -> InMessage {
40        let mut msg = InMessage::default();
41        msg.body.messages = messages;
42        msg
43    }
44
45    /// Header
46    pub fn header(&self) -> Option<&Header> {
47        self.header.as_ref()
48    }
49
50    /// Set message header
51    pub fn set_header(mut self, header: Header) -> Self {
52        self.header = Some(header);
53        self.size.set(0);
54        self
55    }
56
57    /// Message properties
58    pub fn properties(&self) -> Option<&Properties> {
59        self.properties.as_ref()
60    }
61
62    /// Add property
63    pub fn set_properties<F>(mut self, f: F) -> Self
64    where
65        F: Fn(&mut Properties),
66    {
67        if let Some(ref mut props) = self.properties {
68            f(props);
69        } else {
70            let mut props = Properties::default();
71            f(&mut props);
72            self.properties = Some(props);
73        }
74        self.size.set(0);
75        self
76    }
77
78    /// Get application property
79    pub fn app_property(&self, key: &str) -> Option<&Variant> {
80        if let Some(ref props) = self.application_properties {
81            props.get(key)
82        } else {
83            None
84        }
85    }
86
87    /// Get application properties
88    pub fn app_properties(&self) -> Option<&StringVariantMap> {
89        self.application_properties.as_ref()
90    }
91
92    /// Get message annotation
93    pub fn message_annotation(&self, key: &str) -> Option<&Variant> {
94        if let Some(ref props) = self.message_annotations {
95            props.get(key)
96        } else {
97            None
98        }
99    }
100
101    /// Add application property
102    pub fn set_app_property<K: Into<Str>, V: Into<Variant>>(mut self, key: K, value: V) -> Self {
103        if let Some(ref mut props) = self.application_properties {
104            props.insert(key.into(), value.into());
105        } else {
106            let mut props = FxHashMap::default();
107            props.insert(key.into(), value.into());
108            self.application_properties = Some(props);
109        }
110        self.size.set(0);
111        self
112    }
113
114    /// Call closure with message reference
115    pub fn update<F>(self, f: F) -> Self
116    where
117        F: Fn(Self) -> Self,
118    {
119        self.size.set(0);
120        f(self)
121    }
122
123    /// Call closure if value is Some value
124    pub fn if_some<T, F>(self, value: &Option<T>, f: F) -> Self
125    where
126        F: Fn(Self, &T) -> Self,
127    {
128        if let Some(ref val) = value {
129            self.size.set(0);
130            f(self, val)
131        } else {
132            self
133        }
134    }
135
136    /// Message body
137    pub fn body(&self) -> &MessageBody {
138        &self.body
139    }
140
141    /// Message value
142    pub fn value(&self) -> Option<&Variant> {
143        self.body.value.as_ref()
144    }
145
146    /// Set message body value
147    pub fn set_value<V: Into<Variant>>(mut self, v: V) -> Self {
148        self.body.value = Some(v.into());
149        self
150    }
151
152    /// Set message body
153    pub fn set_body<F>(mut self, f: F) -> Self
154    where
155        F: Fn(&mut MessageBody),
156    {
157        f(&mut self.body);
158        self.size.set(0);
159        self
160    }
161
162    /// Create new message and set `correlation_id` property
163    pub fn reply_message(&self) -> OutMessage {
164        let mut msg = OutMessage::default().if_some(&self.properties, |mut msg, data| {
165            msg.set_properties(|props| props.correlation_id = data.message_id.clone());
166            msg
167        });
168        msg.message_format = self.message_format;
169        msg
170    }
171}
172
173impl Decode for InMessage {
174    fn decode(mut input: &[u8]) -> Result<(&[u8], InMessage), AmqpParseError> {
175        let mut message = InMessage::default();
176
177        loop {
178            let (buf, sec) = Section::decode(input)?;
179            match sec {
180                Section::Header(val) => {
181                    message.header = Some(val);
182                }
183                Section::DeliveryAnnotations(val) => {
184                    message.delivery_annotations = Some(val);
185                }
186                Section::MessageAnnotations(val) => {
187                    message.message_annotations = Some(val);
188                }
189                Section::ApplicationProperties(val) => {
190                    message.application_properties = Some(val);
191                }
192                Section::Footer(val) => {
193                    message.footer = Some(val);
194                }
195                Section::Properties(val) => {
196                    message.properties = Some(val);
197                }
198
199                // body
200                Section::AmqpSequence(val) => {
201                    message.body.sequence.push(val);
202                }
203                Section::AmqpValue(val) => {
204                    message.body.value = Some(val);
205                }
206                Section::Data(val) => {
207                    message.body.data.push(val);
208                }
209            }
210            if buf.is_empty() {
211                break;
212            }
213            input = buf;
214        }
215        Ok((input, message))
216    }
217}
218
219impl Encode for InMessage {
220    fn encoded_size(&self) -> usize {
221        let size = self.size.get();
222        if size != 0 {
223            return size;
224        }
225
226        // body size, always add empty body if needed
227        let body_size = self.body.encoded_size();
228        let mut size = if body_size == 0 {
229            // empty bytes
230            SECTION_PREFIX_LENGTH + 2
231        } else {
232            body_size
233        };
234
235        if let Some(ref h) = self.header {
236            size += h.encoded_size() + SECTION_PREFIX_LENGTH;
237        }
238        if let Some(ref da) = self.delivery_annotations {
239            size += da.encoded_size() + SECTION_PREFIX_LENGTH;
240        }
241        if let Some(ref ma) = self.message_annotations {
242            size += ma.encoded_size() + SECTION_PREFIX_LENGTH;
243        }
244        if let Some(ref p) = self.properties {
245            size += p.encoded_size();
246        }
247        if let Some(ref ap) = self.application_properties {
248            size += ap.encoded_size() + SECTION_PREFIX_LENGTH;
249        }
250        if let Some(ref f) = self.footer {
251            size += f.encoded_size() + SECTION_PREFIX_LENGTH;
252        }
253        self.size.set(size);
254        size
255    }
256
257    fn encode(&self, dst: &mut BytesMut) {
258        if let Some(ref h) = self.header {
259            h.encode(dst);
260        }
261        if let Some(ref da) = self.delivery_annotations {
262            Descriptor::Ulong(113).encode(dst);
263            da.encode(dst);
264        }
265        if let Some(ref ma) = self.message_annotations {
266            Descriptor::Ulong(114).encode(dst);
267            ma.encode(dst);
268        }
269        if let Some(ref p) = self.properties {
270            p.encode(dst);
271        }
272        if let Some(ref ap) = self.application_properties {
273            Descriptor::Ulong(116).encode(dst);
274            ap.encode(dst);
275        }
276
277        // message body
278        if self.body.encoded_size() == 0 {
279            // special treatment for empty body
280            Descriptor::Ulong(117).encode(dst);
281            dst.put_u8(FORMATCODE_BINARY8);
282            dst.put_u8(0);
283        } else {
284            self.body.encode(dst);
285        }
286
287        // message footer, always last item
288        if let Some(ref f) = self.footer {
289            Descriptor::Ulong(120).encode(dst);
290            f.encode(dst);
291        }
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use bytes::{Bytes, BytesMut};
298    use bytestring::ByteString;
299
300    use crate::codec::{Decode, Encode};
301    use crate::errors::AmqpCodecError;
302    use crate::protocol::Header;
303    use crate::types::Variant;
304
305    use super::InMessage;
306
307    #[test]
308    fn test_properties() -> Result<(), AmqpCodecError> {
309        let msg =
310            InMessage::with_body(Bytes::from_static(b"Hello world")).set_properties(|props| {
311                props.message_id = Some(Bytes::from_static(b"msg1").into());
312                props.content_type = Some("text".to_string().into());
313                props.correlation_id = Some(Bytes::from_static(b"no1").into());
314                props.content_encoding = Some("utf8+1".to_string().into());
315            });
316
317        let mut buf = BytesMut::with_capacity(msg.encoded_size());
318        msg.encode(&mut buf);
319
320        let msg2 = InMessage::decode(&buf)?.1;
321        let props = msg2.properties.as_ref().unwrap();
322        assert_eq!(props.message_id, Some(Bytes::from_static(b"msg1").into()));
323        assert_eq!(
324            props.correlation_id,
325            Some(Bytes::from_static(b"no1").into())
326        );
327        Ok(())
328    }
329
330    #[test]
331    fn test_app_properties() -> Result<(), AmqpCodecError> {
332        let msg = InMessage::default().set_app_property(ByteString::from("test"), 1);
333
334        let mut buf = BytesMut::with_capacity(msg.encoded_size());
335        msg.encode(&mut buf);
336
337        let msg2 = InMessage::decode(&buf)?.1;
338        let props = msg2.application_properties.as_ref().unwrap();
339        assert_eq!(*props.get("test").unwrap(), Variant::from(1));
340        Ok(())
341    }
342
343    #[test]
344    fn test_header() -> Result<(), AmqpCodecError> {
345        let hdr = Header {
346            durable: false,
347            priority: 1,
348            ttl: None,
349            first_acquirer: false,
350            delivery_count: 1,
351        };
352
353        let msg = InMessage::default().set_header(hdr.clone());
354        let mut buf = BytesMut::with_capacity(msg.encoded_size());
355        msg.encode(&mut buf);
356
357        let msg2 = InMessage::decode(&buf)?.1;
358        assert_eq!(msg2.header().unwrap(), &hdr);
359        Ok(())
360    }
361
362    #[test]
363    fn test_data() -> Result<(), AmqpCodecError> {
364        let data = Bytes::from_static(b"test data");
365
366        let msg = InMessage::default().set_body(|body| body.set_data(data.clone()));
367        let mut buf = BytesMut::with_capacity(msg.encoded_size());
368        msg.encode(&mut buf);
369
370        let msg2 = InMessage::decode(&buf)?.1;
371        assert_eq!(msg2.body.data().unwrap(), &data);
372        Ok(())
373    }
374
375    #[test]
376    fn test_data_empty() -> Result<(), AmqpCodecError> {
377        let msg = InMessage::default();
378        let mut buf = BytesMut::with_capacity(msg.encoded_size());
379        msg.encode(&mut buf);
380
381        let msg2 = InMessage::decode(&buf)?.1;
382        assert_eq!(msg2.body.data().unwrap(), &Bytes::from_static(b""));
383        Ok(())
384    }
385
386    #[test]
387    fn test_messages() -> Result<(), AmqpCodecError> {
388        let msg1 = InMessage::default().set_properties(|props| props.message_id = Some(1.into()));
389        let msg2 = InMessage::default().set_properties(|props| props.message_id = Some(2.into()));
390
391        let msg = InMessage::default().set_body(|body| {
392            body.messages.push(msg1.clone().into());
393            body.messages.push(msg2.clone().into());
394        });
395        let mut buf = BytesMut::with_capacity(msg.encoded_size());
396        msg.encode(&mut buf);
397
398        let msg3 = InMessage::decode(&buf)?.1;
399        let msg4 = InMessage::decode(&msg3.body.data().unwrap())?.1;
400        assert_eq!(msg1.properties, msg4.properties);
401
402        let msg5 = InMessage::decode(&msg3.body.data[1])?.1;
403        assert_eq!(msg2.properties, msg5.properties);
404        Ok(())
405    }
406}