ntex_amqp_codec/message/
message.rs

1use std::cell::Cell;
2
3use ntex_bytes::{Bytes, BytesMut};
4
5use crate::codec::{Decode, Encode};
6use crate::error::AmqpParseError;
7use crate::protocol::{Annotations, Header, MessageFormat, Properties, Section, TransferBody};
8use crate::types::{Descriptor, Str, Symbol, Variant, VecStringMap, VecSymbolMap};
9
10use super::body::MessageBody;
11use super::SECTION_PREFIX_LENGTH;
12
13#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct Message(pub Box<MessageInner>);
15
16#[derive(Debug, Clone, Default, PartialEq, Eq)]
17pub struct MessageInner {
18    pub message_format: Option<MessageFormat>,
19    pub header: Option<Header>,
20    pub delivery_annotations: Option<VecSymbolMap>,
21    pub message_annotations: Option<VecSymbolMap>,
22    pub properties: Option<Properties>,
23    pub application_properties: Option<VecStringMap>,
24    pub footer: Option<Annotations>,
25    pub body: MessageBody,
26    size: Cell<usize>,
27}
28
29impl Message {
30    #[inline]
31    /// Create new message and set body
32    pub fn with_body(body: Bytes) -> Message {
33        let mut msg = Message::default();
34        msg.0.body.data.push(body);
35        msg.0.message_format = Some(0);
36        msg
37    }
38
39    #[inline]
40    /// Create new message and set messages as body
41    pub fn with_messages(messages: Vec<TransferBody>) -> Message {
42        let mut msg = Message::default();
43        msg.0.body.messages = messages;
44        msg.0.message_format = Some(0);
45        msg
46    }
47
48    #[inline]
49    /// Header
50    pub fn header(&self) -> Option<&Header> {
51        self.0.header.as_ref()
52    }
53
54    #[inline]
55    /// Set message header
56    pub fn set_header(&mut self, header: Header) -> &mut Self {
57        self.0.header = Some(header);
58        self.0.size.set(0);
59        self
60    }
61
62    #[inline]
63    /// Set message format
64    pub fn set_format(&mut self, format: MessageFormat) -> &mut Self {
65        self.0.message_format = Some(format);
66        self
67    }
68
69    #[inline]
70    /// Message properties
71    pub fn properties(&self) -> Option<&Properties> {
72        self.0.properties.as_ref()
73    }
74
75    #[inline]
76    /// Mutable reference to properties
77    pub fn properties_mut(&mut self) -> &mut Properties {
78        if self.0.properties.is_none() {
79            self.0.properties = Some(Properties::default());
80        }
81
82        self.0.size.set(0);
83        self.0.properties.as_mut().unwrap()
84    }
85
86    #[inline]
87    /// Add property
88    pub fn set_properties<F>(&mut self, f: F) -> &mut Self
89    where
90        F: FnOnce(&mut Properties),
91    {
92        if let Some(ref mut props) = self.0.properties {
93            f(props);
94        } else {
95            let mut props = Properties::default();
96            f(&mut props);
97            self.0.properties = Some(props);
98        }
99        self.0.size.set(0);
100        self
101    }
102
103    #[inline]
104    /// Get application property
105    pub fn app_properties(&self) -> Option<&VecStringMap> {
106        self.0.application_properties.as_ref()
107    }
108
109    #[inline]
110    /// Mut ref tp application property
111    pub fn app_properties_mut(&mut self) -> Option<&mut VecStringMap> {
112        self.0.application_properties.as_mut()
113    }
114
115    #[inline]
116    /// Get application property
117    pub fn app_property(&self, key: &str) -> Option<&Variant> {
118        if let Some(ref props) = self.0.application_properties {
119            props
120                .iter()
121                .find_map(|item| if &item.0 == key { Some(&item.1) } else { None })
122        } else {
123            None
124        }
125    }
126
127    #[inline]
128    /// Add application property
129    pub fn set_app_property<K, V>(&mut self, key: K, value: V) -> &mut Self
130    where
131        K: Into<Str>,
132        V: Into<Variant>,
133    {
134        if let Some(ref mut props) = self.0.application_properties {
135            props.push((key.into(), value.into()));
136        } else {
137            let mut props = VecStringMap::default();
138            props.push((key.into(), value.into()));
139            self.0.application_properties = Some(props);
140        }
141        self.0.size.set(0);
142        self
143    }
144
145    #[inline]
146    /// Get message annotation
147    pub fn message_annotation(&self, key: &str) -> Option<&Variant> {
148        if let Some(ref props) = self.0.message_annotations {
149            props
150                .iter()
151                .find_map(|item| if &item.0 == key { Some(&item.1) } else { None })
152        } else {
153            None
154        }
155    }
156
157    #[inline]
158    /// Add message annotation
159    pub fn add_message_annotation<K, V>(&mut self, key: K, value: V) -> &mut Self
160    where
161        K: Into<Symbol>,
162        V: Into<Variant>,
163    {
164        if let Some(ref mut props) = self.0.message_annotations {
165            props.push((key.into(), value.into()));
166        } else {
167            let mut props = VecSymbolMap::default();
168            props.push((key.into(), value.into()));
169            self.0.message_annotations = Some(props);
170        }
171        self.0.size.set(0);
172        self
173    }
174
175    #[inline]
176    /// Get message annotations
177    pub fn message_annotations(&self) -> Option<&VecSymbolMap> {
178        self.0.message_annotations.as_ref()
179    }
180
181    #[inline]
182    /// Mut reference to message annotations
183    pub fn message_annotations_mut(&mut self) -> Option<&mut VecSymbolMap> {
184        self.0.message_annotations.as_mut()
185    }
186
187    #[inline]
188    /// Delivery annotations
189    pub fn delivery_annotations(&self) -> Option<&VecSymbolMap> {
190        self.0.delivery_annotations.as_ref()
191    }
192
193    #[inline]
194    /// Mut reference to delivery annotations
195    pub fn delivery_annotations_mut(&mut self) -> Option<&mut VecSymbolMap> {
196        self.0.delivery_annotations.as_mut()
197    }
198
199    #[inline]
200    /// Call closure with message reference
201    pub fn update<F>(self, f: F) -> Self
202    where
203        F: Fn(Self) -> Self,
204    {
205        self.0.size.set(0);
206        f(self)
207    }
208
209    #[inline]
210    /// Call closure if value is Some value
211    pub fn if_some<T, F>(self, value: &Option<T>, f: F) -> Self
212    where
213        F: Fn(Self, &T) -> Self,
214    {
215        if let Some(ref val) = value {
216            self.0.size.set(0);
217            f(self, val)
218        } else {
219            self
220        }
221    }
222
223    #[inline]
224    /// Message body
225    pub fn body(&self) -> &MessageBody {
226        &self.0.body
227    }
228
229    #[inline]
230    /// Mutable message body
231    pub fn body_mut(&mut self) -> &mut MessageBody {
232        &mut self.0.body
233    }
234
235    #[inline]
236    /// Message value
237    pub fn value(&self) -> Option<&Variant> {
238        self.0.body.value.as_ref()
239    }
240
241    #[inline]
242    /// Set message body value
243    pub fn set_value<V: Into<Variant>>(&mut self, v: V) -> &mut Self {
244        self.0.body.value = Some(v.into());
245        self
246    }
247
248    #[inline]
249    /// Set message body
250    pub fn set_body<F>(&mut self, f: F) -> &mut Self
251    where
252        F: FnOnce(&mut MessageBody),
253    {
254        f(&mut self.0.body);
255        self.0.size.set(0);
256        self
257    }
258
259    #[inline]
260    /// Create new message and set `correlation_id` property
261    pub fn reply_message(&self) -> Message {
262        Message::default().if_some(&self.0.properties, |mut msg, data| {
263            msg.set_properties(|props| props.correlation_id.clone_from(&data.message_id));
264            msg
265        })
266    }
267}
268
269impl Decode for Message {
270    fn decode(input: &mut Bytes) -> Result<Message, AmqpParseError> {
271        let mut message = Message::default();
272
273        loop {
274            if input.is_empty() {
275                break;
276            }
277
278            let sec = Section::decode(input)?;
279            match sec {
280                Section::Header(val) => {
281                    message.0.header = Some(val);
282                }
283                Section::DeliveryAnnotations(val) => {
284                    message.0.delivery_annotations = Some(val);
285                }
286                Section::MessageAnnotations(val) => {
287                    message.0.message_annotations = Some(val);
288                }
289                Section::ApplicationProperties(val) => {
290                    message.0.application_properties = Some(val);
291                }
292                Section::Footer(val) => {
293                    message.0.footer = Some(val);
294                }
295                Section::Properties(val) => {
296                    message.0.properties = Some(val);
297                }
298
299                // body
300                Section::AmqpSequence(val) => {
301                    message.0.body.sequence.push(val);
302                }
303                Section::AmqpValue(val) => {
304                    message.0.body.value = Some(val);
305                }
306                Section::Data(val) => {
307                    message.0.body.data.push(val);
308                }
309            }
310        }
311        Ok(message)
312    }
313}
314
315impl Encode for Message {
316    fn encoded_size(&self) -> usize {
317        let size = self.0.size.get();
318        if size != 0 {
319            return size;
320        }
321
322        let mut size = self.0.body.encoded_size();
323
324        if let Some(ref h) = self.0.header {
325            size += h.encoded_size();
326        }
327        if let Some(ref da) = self.0.delivery_annotations {
328            size += da.encoded_size() + SECTION_PREFIX_LENGTH;
329        }
330        if let Some(ref ma) = self.0.message_annotations {
331            size += ma.encoded_size() + SECTION_PREFIX_LENGTH;
332        }
333        if let Some(ref p) = self.0.properties {
334            size += p.encoded_size();
335        }
336        if let Some(ref ap) = self.0.application_properties {
337            size += ap.encoded_size() + SECTION_PREFIX_LENGTH;
338        }
339        if let Some(ref f) = self.0.footer {
340            size += f.encoded_size() + SECTION_PREFIX_LENGTH;
341        }
342        self.0.size.set(size);
343        size
344    }
345
346    fn encode(&self, dst: &mut BytesMut) {
347        if let Some(ref h) = self.0.header {
348            h.encode(dst);
349        }
350        if let Some(ref da) = self.0.delivery_annotations {
351            Descriptor::Ulong(113).encode(dst);
352            da.encode(dst);
353        }
354        if let Some(ref ma) = self.0.message_annotations {
355            Descriptor::Ulong(114).encode(dst);
356            ma.encode(dst);
357        }
358        if let Some(ref p) = self.0.properties {
359            p.encode(dst);
360        }
361        if let Some(ref ap) = self.0.application_properties {
362            Descriptor::Ulong(116).encode(dst);
363            ap.encode(dst);
364        }
365
366        // message body
367        self.0.body.encode(dst);
368
369        // message footer, always last item
370        if let Some(ref f) = self.0.footer {
371            Descriptor::Ulong(120).encode(dst);
372            f.encode(dst);
373        }
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use ntex_bytes::{ByteString, Bytes, BytesMut};
380    use uuid::Uuid;
381
382    use crate::codec::{Decode, Encode};
383    use crate::error::AmqpCodecError;
384    use crate::protocol::Header;
385    use crate::types::Variant;
386
387    use super::Message;
388
389    #[test]
390    fn test_properties() -> Result<(), AmqpCodecError> {
391        let mut msg = Message::default();
392        msg.set_properties(|props| props.message_id = Some(1.into()));
393
394        let mut buf = BytesMut::with_capacity(msg.encoded_size());
395        msg.encode(&mut buf);
396
397        let msg2 = Message::decode(&mut buf.freeze())?;
398        let props = msg2.properties().unwrap();
399        assert_eq!(props.message_id, Some(1.into()));
400        Ok(())
401    }
402
403    #[test]
404    fn test_app_properties() -> Result<(), AmqpCodecError> {
405        let mut msg = Message::default();
406        msg.set_app_property(ByteString::from("test"), 1);
407
408        let mut buf = BytesMut::with_capacity(msg.encoded_size());
409        msg.encode(&mut buf);
410
411        let msg2 = Message::decode(&mut buf.freeze())?;
412        let props = msg2.app_properties().unwrap();
413        assert_eq!(props[0].0.as_str(), "test");
414        assert_eq!(props[0].1, Variant::from(1));
415        Ok(())
416    }
417
418    #[test]
419    fn test_header() -> Result<(), AmqpCodecError> {
420        let hdr = Header {
421            durable: false,
422            priority: 1,
423            ttl: None,
424            first_acquirer: false,
425            delivery_count: 1,
426        };
427
428        let mut msg = Message::default();
429        msg.set_header(hdr.clone());
430        let mut buf = BytesMut::with_capacity(msg.encoded_size());
431        msg.encode(&mut buf);
432
433        let msg2 = Message::decode(&mut buf.freeze())?;
434        assert_eq!(msg2.header().unwrap(), &hdr);
435        Ok(())
436    }
437
438    #[test]
439    fn test_data() -> Result<(), AmqpCodecError> {
440        let data = Bytes::from_static(b"test data");
441
442        let mut msg = Message::default();
443        msg.set_body(|body| body.set_data(data.clone()));
444        let mut buf = BytesMut::with_capacity(msg.encoded_size());
445        msg.encode(&mut buf);
446
447        let msg2 = Message::decode(&mut buf.freeze())?;
448        assert_eq!(msg2.body().data().unwrap(), &data);
449        Ok(())
450    }
451
452    #[test]
453    fn test_data_empty() -> Result<(), AmqpCodecError> {
454        let msg = Message::default();
455        let mut buf = BytesMut::with_capacity(msg.encoded_size());
456        msg.encode(&mut buf);
457        assert_eq!(buf, Bytes::from_static(b""));
458
459        let msg2 = Message::decode(&mut buf.freeze())?;
460        assert!(msg2.body().data().is_none());
461        Ok(())
462    }
463
464    #[test]
465    fn test_messages() -> Result<(), AmqpCodecError> {
466        let mut msg1 = Message::default();
467        msg1.set_properties(|props| props.message_id = Some(1.into()));
468        let mut msg2 = Message::default();
469        msg2.set_properties(|props| props.message_id = Some(2.into()));
470
471        let mut msg = Message::default();
472        msg.set_body(|body| {
473            body.messages.push(msg1.clone().into());
474            body.messages.push(msg2.clone().into());
475        });
476        let mut buf = BytesMut::with_capacity(msg.encoded_size());
477        msg.encode(&mut buf);
478
479        let msg3 = Message::decode(&mut buf.freeze())?;
480        let msg4 = Message::decode(&mut msg3.body().data().unwrap().clone())?;
481        assert_eq!(msg1.properties(), msg4.properties());
482
483        let msg5 = Message::decode(&mut msg3.body().data[1].clone())?;
484        assert_eq!(msg2.properties(), msg5.properties());
485        Ok(())
486    }
487
488    #[test]
489    fn test_messages_codec() -> Result<(), AmqpCodecError> {
490        let mut msg = Message::default();
491        msg.set_properties(|props| props.message_id = Some(Uuid::new_v4().into()));
492
493        let mut buf = BytesMut::with_capacity(msg.encoded_size());
494        msg.encode(&mut buf);
495
496        let msg2 = Message::decode(&mut buf.freeze())?;
497        assert_eq!(msg.properties(), msg2.properties());
498        Ok(())
499    }
500}