amqp_codec/message/
outmessage.rs

1use std::cell::Cell;
2
3use bytes::{BufMut, Bytes, BytesMut};
4
5use crate::codec::{Decode, Encode, FORMATCODE_BINARY8};
6use crate::errors::AmqpParseError;
7use crate::protocol::{Annotations, Header, MessageFormat, Properties, Section, TransferBody};
8use crate::types::{Descriptor, Str, Symbol, Variant, VecStringMap, VecSymbolMap};
9
10use super::body::MessageBody;
11use super::inmessage::InMessage;
12use super::SECTION_PREFIX_LENGTH;
13
14#[derive(Debug, Clone, Default, PartialEq)]
15pub struct OutMessage {
16    pub message_format: Option<MessageFormat>,
17    header: Option<Header>,
18    delivery_annotations: Option<Annotations>,
19    message_annotations: Option<VecSymbolMap>,
20    properties: Option<Properties>,
21    application_properties: Option<VecStringMap>,
22    footer: Option<Annotations>,
23    body: MessageBody,
24    size: Cell<usize>,
25}
26
27impl OutMessage {
28    /// Create new message and set body
29    pub fn with_body(body: Bytes) -> OutMessage {
30        let mut msg = OutMessage::default();
31        msg.body.data.push(body);
32        msg.message_format = Some(0);
33        msg
34    }
35
36    /// Create new message and set messages as body
37    pub fn with_messages(messages: Vec<TransferBody>) -> OutMessage {
38        let mut msg = OutMessage::default();
39        msg.body.messages = messages;
40        msg.message_format = Some(0);
41        msg
42    }
43
44    /// Header
45    pub fn header(&self) -> Option<&Header> {
46        self.header.as_ref()
47    }
48
49    /// Set message header
50    pub fn set_header(&mut self, header: Header) -> &mut Self {
51        self.header = Some(header);
52        self.size.set(0);
53        self
54    }
55
56    /// Message properties
57    pub fn properties(&self) -> Option<&Properties> {
58        self.properties.as_ref()
59    }
60
61    /// Mutable reference to properties
62    pub fn properties_mut(&mut self) -> &mut Properties {
63        if self.properties.is_none() {
64            self.properties = Some(Properties::default());
65        }
66
67        self.size.set(0);
68        self.properties.as_mut().unwrap()
69    }
70
71    /// Add property
72    pub fn set_properties<F>(&mut self, f: F) -> &mut Self
73    where
74        F: Fn(&mut Properties),
75    {
76        if let Some(ref mut props) = self.properties {
77            f(props);
78        } else {
79            let mut props = Properties::default();
80            f(&mut props);
81            self.properties = Some(props);
82        }
83        self.size.set(0);
84        self
85    }
86
87    /// Get application property
88    pub fn app_properties(&self) -> Option<&VecStringMap> {
89        self.application_properties.as_ref()
90    }
91
92    /// Add application property
93    pub fn set_app_property<K, V>(&mut self, key: K, value: V) -> &mut Self
94    where
95        K: Into<Str>,
96        V: Into<Variant>,
97    {
98        if let Some(ref mut props) = self.application_properties {
99            props.push((key.into(), value.into()));
100        } else {
101            let mut props = VecStringMap::default();
102            props.push((key.into(), value.into()));
103            self.application_properties = Some(props);
104        }
105        self.size.set(0);
106        self
107    }
108
109    /// Add message annotation
110    pub fn add_message_annotation<K, V>(&mut self, key: K, value: V) -> &mut Self
111    where
112        K: Into<Symbol>,
113        V: Into<Variant>,
114    {
115        if let Some(ref mut props) = self.message_annotations {
116            props.push((key.into(), value.into()));
117        } else {
118            let mut props = VecSymbolMap::default();
119            props.push((key.into(), value.into()));
120            self.message_annotations = Some(props);
121        }
122        self.size.set(0);
123        self
124    }
125
126    /// Call closure with message reference
127    pub fn update<F>(self, f: F) -> Self
128    where
129        F: Fn(Self) -> Self,
130    {
131        self.size.set(0);
132        f(self)
133    }
134
135    /// Call closure if value is Some value
136    pub fn if_some<T, F>(self, value: &Option<T>, f: F) -> Self
137    where
138        F: Fn(Self, &T) -> Self,
139    {
140        if let Some(ref val) = value {
141            self.size.set(0);
142            f(self, val)
143        } else {
144            self
145        }
146    }
147
148    /// Message body
149    pub fn body(&self) -> &MessageBody {
150        &self.body
151    }
152
153    /// Message value
154    pub fn value(&self) -> Option<&Variant> {
155        self.body.value.as_ref()
156    }
157
158    /// Set message body value
159    pub fn set_value<V: Into<Variant>>(&mut self, v: V) -> &mut Self {
160        self.body.value = Some(v.into());
161        self
162    }
163
164    /// Set message body
165    pub fn set_body<F>(&mut self, f: F) -> &mut Self
166    where
167        F: Fn(&mut MessageBody),
168    {
169        f(&mut self.body);
170        self.size.set(0);
171        self
172    }
173
174    /// Create new message and set `correlation_id` property
175    pub fn reply_message(&self) -> OutMessage {
176        OutMessage::default().if_some(&self.properties, |mut msg, data| {
177            msg.set_properties(|props| props.correlation_id = data.message_id.clone());
178            msg
179        })
180    }
181}
182
183impl From<InMessage> for OutMessage {
184    fn from(from: InMessage) -> Self {
185        let mut msg = OutMessage {
186            message_format: from.message_format,
187            header: from.header,
188            properties: from.properties,
189            delivery_annotations: from.delivery_annotations,
190            message_annotations: from.message_annotations.map(|v| v.into()),
191            application_properties: from.application_properties.map(|v| v.into()),
192            footer: from.footer,
193            body: from.body,
194            size: Cell::new(0),
195        };
196
197        if let Some(ref mut props) = msg.properties {
198            props.correlation_id = props.message_id.clone();
199        };
200
201        msg
202    }
203}
204
205impl Decode for OutMessage {
206    fn decode(mut input: &[u8]) -> Result<(&[u8], OutMessage), AmqpParseError> {
207        let mut message = OutMessage::default();
208
209        loop {
210            let (buf, sec) = Section::decode(input)?;
211            match sec {
212                Section::Header(val) => {
213                    message.header = Some(val);
214                }
215                Section::DeliveryAnnotations(val) => {
216                    message.delivery_annotations = Some(val);
217                }
218                Section::MessageAnnotations(val) => {
219                    message.message_annotations = Some(VecSymbolMap(
220                        val.into_iter().map(|(k, v)| (k.0.into(), v)).collect(),
221                    ));
222                }
223                Section::ApplicationProperties(val) => {
224                    message.application_properties = Some(VecStringMap(
225                        val.into_iter().map(|(k, v)| (k.into(), v)).collect(),
226                    ));
227                }
228                Section::Footer(val) => {
229                    message.footer = Some(val);
230                }
231                Section::Properties(val) => {
232                    message.properties = Some(val);
233                }
234
235                // body
236                Section::AmqpSequence(val) => {
237                    message.body.sequence.push(val);
238                }
239                Section::AmqpValue(val) => {
240                    message.body.value = Some(val);
241                }
242                Section::Data(val) => {
243                    message.body.data.push(val);
244                }
245            }
246            if buf.is_empty() {
247                break;
248            }
249            input = buf;
250        }
251        Ok((input, message))
252    }
253}
254
255impl Encode for OutMessage {
256    fn encoded_size(&self) -> usize {
257        let size = self.size.get();
258        if size != 0 {
259            return size;
260        }
261
262        // body size, always add empty body if needed
263        let body_size = self.body.encoded_size();
264        let mut size = if body_size == 0 {
265            // empty bytes
266            SECTION_PREFIX_LENGTH + 2
267        } else {
268            body_size
269        };
270
271        if let Some(ref h) = self.header {
272            size += h.encoded_size() + SECTION_PREFIX_LENGTH;
273        }
274        if let Some(ref da) = self.delivery_annotations {
275            size += da.encoded_size() + SECTION_PREFIX_LENGTH;
276        }
277        if let Some(ref ma) = self.message_annotations {
278            size += ma.encoded_size() + SECTION_PREFIX_LENGTH;
279        }
280        if let Some(ref p) = self.properties {
281            size += p.encoded_size();
282        }
283        if let Some(ref ap) = self.application_properties {
284            size += ap.encoded_size() + SECTION_PREFIX_LENGTH;
285        }
286        if let Some(ref f) = self.footer {
287            size += f.encoded_size() + SECTION_PREFIX_LENGTH;
288        }
289        self.size.set(size);
290        size
291    }
292
293    fn encode(&self, dst: &mut BytesMut) {
294        if let Some(ref h) = self.header {
295            h.encode(dst);
296        }
297        if let Some(ref da) = self.delivery_annotations {
298            Descriptor::Ulong(113).encode(dst);
299            da.encode(dst);
300        }
301        if let Some(ref ma) = self.message_annotations {
302            Descriptor::Ulong(114).encode(dst);
303            ma.encode(dst);
304        }
305        if let Some(ref p) = self.properties {
306            p.encode(dst);
307        }
308        if let Some(ref ap) = self.application_properties {
309            Descriptor::Ulong(116).encode(dst);
310            ap.encode(dst);
311        }
312
313        // message body
314        if self.body.encoded_size() == 0 {
315            // special treatment for empty body
316            Descriptor::Ulong(117).encode(dst);
317            dst.put_u8(FORMATCODE_BINARY8);
318            dst.put_u8(0);
319        } else {
320            self.body.encode(dst);
321        }
322
323        // message footer, always last item
324        if let Some(ref f) = self.footer {
325            Descriptor::Ulong(120).encode(dst);
326            f.encode(dst);
327        }
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use bytes::{Bytes, BytesMut};
334    use bytestring::ByteString;
335
336    use crate::codec::{Decode, Encode};
337    use crate::errors::AmqpCodecError;
338    use crate::protocol::Header;
339    use crate::types::Variant;
340
341    use super::OutMessage;
342
343    #[test]
344    fn test_properties() -> Result<(), AmqpCodecError> {
345        let mut msg = OutMessage::default();
346        msg.set_properties(|props| props.message_id = Some(1.into()));
347
348        let mut buf = BytesMut::with_capacity(msg.encoded_size());
349        msg.encode(&mut buf);
350
351        let msg2 = OutMessage::decode(&buf)?.1;
352        let props = msg2.properties.as_ref().unwrap();
353        assert_eq!(props.message_id, Some(1.into()));
354        Ok(())
355    }
356
357    #[test]
358    fn test_app_properties() -> Result<(), AmqpCodecError> {
359        let mut msg = OutMessage::default();
360        msg.set_app_property(ByteString::from("test"), 1);
361
362        let mut buf = BytesMut::with_capacity(msg.encoded_size());
363        msg.encode(&mut buf);
364
365        let msg2 = OutMessage::decode(&buf)?.1;
366        let props = msg2.application_properties.as_ref().unwrap();
367        assert_eq!(props[0].0.as_str(), "test");
368        assert_eq!(props[0].1, Variant::from(1));
369        Ok(())
370    }
371
372    #[test]
373    fn test_header() -> Result<(), AmqpCodecError> {
374        let hdr = Header {
375            durable: false,
376            priority: 1,
377            ttl: None,
378            first_acquirer: false,
379            delivery_count: 1,
380        };
381
382        let mut msg = OutMessage::default();
383        msg.set_header(hdr.clone());
384        let mut buf = BytesMut::with_capacity(msg.encoded_size());
385        msg.encode(&mut buf);
386
387        let msg2 = OutMessage::decode(&buf)?.1;
388        assert_eq!(msg2.header().unwrap(), &hdr);
389        Ok(())
390    }
391
392    #[test]
393    fn test_data() -> Result<(), AmqpCodecError> {
394        let data = Bytes::from_static(b"test data");
395
396        let mut msg = OutMessage::default();
397        msg.set_body(|body| body.set_data(data.clone()));
398        let mut buf = BytesMut::with_capacity(msg.encoded_size());
399        msg.encode(&mut buf);
400
401        let msg2 = OutMessage::decode(&buf)?.1;
402        assert_eq!(msg2.body.data().unwrap(), &data);
403        Ok(())
404    }
405
406    #[test]
407    fn test_data_empty() -> Result<(), AmqpCodecError> {
408        let msg = OutMessage::default();
409        let mut buf = BytesMut::with_capacity(msg.encoded_size());
410        msg.encode(&mut buf);
411
412        let msg2 = OutMessage::decode(&buf)?.1;
413        assert_eq!(msg2.body.data().unwrap(), &Bytes::from_static(b""));
414        Ok(())
415    }
416
417    #[test]
418    fn test_messages() -> Result<(), AmqpCodecError> {
419        let mut msg1 = OutMessage::default();
420        msg1.set_properties(|props| props.message_id = Some(1.into()));
421        let mut msg2 = OutMessage::default();
422        msg2.set_properties(|props| props.message_id = Some(2.into()));
423
424        let mut msg = OutMessage::default();
425        msg.set_body(|body| {
426            body.messages.push(msg1.clone().into());
427            body.messages.push(msg2.clone().into());
428        });
429        let mut buf = BytesMut::with_capacity(msg.encoded_size());
430        msg.encode(&mut buf);
431
432        let msg3 = OutMessage::decode(&buf)?.1;
433        let msg4 = OutMessage::decode(&msg3.body.data().unwrap())?.1;
434        assert_eq!(msg1.properties, msg4.properties);
435
436        let msg5 = OutMessage::decode(&msg3.body.data[1])?.1;
437        assert_eq!(msg2.properties, msg5.properties);
438        Ok(())
439    }
440}