1use std::cell::Cell;
2
3use ntex_bytes::{Bytes, BytesMut};
4
5use crate::codec::{Decode, Encode};
6use crate::error::AmqpParseError;
7use crate::protocol::{Annotations, Header, MessageFormat, Properties, Section, TransferBody};
8use crate::types::{Descriptor, Str, Symbol, Variant, VecStringMap, VecSymbolMap};
9
10use super::body::MessageBody;
11use super::SECTION_PREFIX_LENGTH;
12
13#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct Message(pub Box<MessageInner>);
15
16#[derive(Debug, Clone, Default, PartialEq, Eq)]
17pub struct MessageInner {
18 pub message_format: Option<MessageFormat>,
19 pub header: Option<Header>,
20 pub delivery_annotations: Option<VecSymbolMap>,
21 pub message_annotations: Option<VecSymbolMap>,
22 pub properties: Option<Properties>,
23 pub application_properties: Option<VecStringMap>,
24 pub footer: Option<Annotations>,
25 pub body: MessageBody,
26 size: Cell<usize>,
27}
28
29impl Message {
30 #[inline]
31 pub fn with_body(body: Bytes) -> Message {
33 let mut msg = Message::default();
34 msg.0.body.data.push(body);
35 msg.0.message_format = Some(0);
36 msg
37 }
38
39 #[inline]
40 pub fn with_messages(messages: Vec<TransferBody>) -> Message {
42 let mut msg = Message::default();
43 msg.0.body.messages = messages;
44 msg.0.message_format = Some(0);
45 msg
46 }
47
48 #[inline]
49 pub fn header(&self) -> Option<&Header> {
51 self.0.header.as_ref()
52 }
53
54 #[inline]
55 pub fn set_header(&mut self, header: Header) -> &mut Self {
57 self.0.header = Some(header);
58 self.0.size.set(0);
59 self
60 }
61
62 #[inline]
63 pub fn set_format(&mut self, format: MessageFormat) -> &mut Self {
65 self.0.message_format = Some(format);
66 self
67 }
68
69 #[inline]
70 pub fn properties(&self) -> Option<&Properties> {
72 self.0.properties.as_ref()
73 }
74
75 #[inline]
76 pub fn properties_mut(&mut self) -> &mut Properties {
78 if self.0.properties.is_none() {
79 self.0.properties = Some(Properties::default());
80 }
81
82 self.0.size.set(0);
83 self.0.properties.as_mut().unwrap()
84 }
85
86 #[inline]
87 pub fn set_properties<F>(&mut self, f: F) -> &mut Self
89 where
90 F: FnOnce(&mut Properties),
91 {
92 if let Some(ref mut props) = self.0.properties {
93 f(props);
94 } else {
95 let mut props = Properties::default();
96 f(&mut props);
97 self.0.properties = Some(props);
98 }
99 self.0.size.set(0);
100 self
101 }
102
103 #[inline]
104 pub fn app_properties(&self) -> Option<&VecStringMap> {
106 self.0.application_properties.as_ref()
107 }
108
109 #[inline]
110 pub fn app_properties_mut(&mut self) -> Option<&mut VecStringMap> {
112 self.0.application_properties.as_mut()
113 }
114
115 #[inline]
116 pub fn app_property(&self, key: &str) -> Option<&Variant> {
118 if let Some(ref props) = self.0.application_properties {
119 props
120 .iter()
121 .find_map(|item| if &item.0 == key { Some(&item.1) } else { None })
122 } else {
123 None
124 }
125 }
126
127 #[inline]
128 pub fn set_app_property<K, V>(&mut self, key: K, value: V) -> &mut Self
130 where
131 K: Into<Str>,
132 V: Into<Variant>,
133 {
134 if let Some(ref mut props) = self.0.application_properties {
135 props.push((key.into(), value.into()));
136 } else {
137 let mut props = VecStringMap::default();
138 props.push((key.into(), value.into()));
139 self.0.application_properties = Some(props);
140 }
141 self.0.size.set(0);
142 self
143 }
144
145 #[inline]
146 pub fn message_annotation(&self, key: &str) -> Option<&Variant> {
148 if let Some(ref props) = self.0.message_annotations {
149 props
150 .iter()
151 .find_map(|item| if &item.0 == key { Some(&item.1) } else { None })
152 } else {
153 None
154 }
155 }
156
157 #[inline]
158 pub fn add_message_annotation<K, V>(&mut self, key: K, value: V) -> &mut Self
160 where
161 K: Into<Symbol>,
162 V: Into<Variant>,
163 {
164 if let Some(ref mut props) = self.0.message_annotations {
165 props.push((key.into(), value.into()));
166 } else {
167 let mut props = VecSymbolMap::default();
168 props.push((key.into(), value.into()));
169 self.0.message_annotations = Some(props);
170 }
171 self.0.size.set(0);
172 self
173 }
174
175 #[inline]
176 pub fn message_annotations(&self) -> Option<&VecSymbolMap> {
178 self.0.message_annotations.as_ref()
179 }
180
181 #[inline]
182 pub fn message_annotations_mut(&mut self) -> Option<&mut VecSymbolMap> {
184 self.0.message_annotations.as_mut()
185 }
186
187 #[inline]
188 pub fn delivery_annotations(&self) -> Option<&VecSymbolMap> {
190 self.0.delivery_annotations.as_ref()
191 }
192
193 #[inline]
194 pub fn delivery_annotations_mut(&mut self) -> Option<&mut VecSymbolMap> {
196 self.0.delivery_annotations.as_mut()
197 }
198
199 #[inline]
200 pub fn update<F>(self, f: F) -> Self
202 where
203 F: Fn(Self) -> Self,
204 {
205 self.0.size.set(0);
206 f(self)
207 }
208
209 #[inline]
210 pub fn if_some<T, F>(self, value: &Option<T>, f: F) -> Self
212 where
213 F: Fn(Self, &T) -> Self,
214 {
215 if let Some(ref val) = value {
216 self.0.size.set(0);
217 f(self, val)
218 } else {
219 self
220 }
221 }
222
223 #[inline]
224 pub fn body(&self) -> &MessageBody {
226 &self.0.body
227 }
228
229 #[inline]
230 pub fn body_mut(&mut self) -> &mut MessageBody {
232 &mut self.0.body
233 }
234
235 #[inline]
236 pub fn value(&self) -> Option<&Variant> {
238 self.0.body.value.as_ref()
239 }
240
241 #[inline]
242 pub fn set_value<V: Into<Variant>>(&mut self, v: V) -> &mut Self {
244 self.0.body.value = Some(v.into());
245 self
246 }
247
248 #[inline]
249 pub fn set_body<F>(&mut self, f: F) -> &mut Self
251 where
252 F: FnOnce(&mut MessageBody),
253 {
254 f(&mut self.0.body);
255 self.0.size.set(0);
256 self
257 }
258
259 #[inline]
260 pub fn reply_message(&self) -> Message {
262 Message::default().if_some(&self.0.properties, |mut msg, data| {
263 msg.set_properties(|props| props.correlation_id.clone_from(&data.message_id));
264 msg
265 })
266 }
267}
268
269impl Decode for Message {
270 fn decode(input: &mut Bytes) -> Result<Message, AmqpParseError> {
271 let mut message = Message::default();
272
273 loop {
274 if input.is_empty() {
275 break;
276 }
277
278 let sec = Section::decode(input)?;
279 match sec {
280 Section::Header(val) => {
281 message.0.header = Some(val);
282 }
283 Section::DeliveryAnnotations(val) => {
284 message.0.delivery_annotations = Some(val);
285 }
286 Section::MessageAnnotations(val) => {
287 message.0.message_annotations = Some(val);
288 }
289 Section::ApplicationProperties(val) => {
290 message.0.application_properties = Some(val);
291 }
292 Section::Footer(val) => {
293 message.0.footer = Some(val);
294 }
295 Section::Properties(val) => {
296 message.0.properties = Some(val);
297 }
298
299 Section::AmqpSequence(val) => {
301 message.0.body.sequence.push(val);
302 }
303 Section::AmqpValue(val) => {
304 message.0.body.value = Some(val);
305 }
306 Section::Data(val) => {
307 message.0.body.data.push(val);
308 }
309 }
310 }
311 Ok(message)
312 }
313}
314
315impl Encode for Message {
316 fn encoded_size(&self) -> usize {
317 let size = self.0.size.get();
318 if size != 0 {
319 return size;
320 }
321
322 let mut size = self.0.body.encoded_size();
323
324 if let Some(ref h) = self.0.header {
325 size += h.encoded_size();
326 }
327 if let Some(ref da) = self.0.delivery_annotations {
328 size += da.encoded_size() + SECTION_PREFIX_LENGTH;
329 }
330 if let Some(ref ma) = self.0.message_annotations {
331 size += ma.encoded_size() + SECTION_PREFIX_LENGTH;
332 }
333 if let Some(ref p) = self.0.properties {
334 size += p.encoded_size();
335 }
336 if let Some(ref ap) = self.0.application_properties {
337 size += ap.encoded_size() + SECTION_PREFIX_LENGTH;
338 }
339 if let Some(ref f) = self.0.footer {
340 size += f.encoded_size() + SECTION_PREFIX_LENGTH;
341 }
342 self.0.size.set(size);
343 size
344 }
345
346 fn encode(&self, dst: &mut BytesMut) {
347 if let Some(ref h) = self.0.header {
348 h.encode(dst);
349 }
350 if let Some(ref da) = self.0.delivery_annotations {
351 Descriptor::Ulong(113).encode(dst);
352 da.encode(dst);
353 }
354 if let Some(ref ma) = self.0.message_annotations {
355 Descriptor::Ulong(114).encode(dst);
356 ma.encode(dst);
357 }
358 if let Some(ref p) = self.0.properties {
359 p.encode(dst);
360 }
361 if let Some(ref ap) = self.0.application_properties {
362 Descriptor::Ulong(116).encode(dst);
363 ap.encode(dst);
364 }
365
366 self.0.body.encode(dst);
368
369 if let Some(ref f) = self.0.footer {
371 Descriptor::Ulong(120).encode(dst);
372 f.encode(dst);
373 }
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use ntex_bytes::{ByteString, Bytes, BytesMut};
380 use uuid::Uuid;
381
382 use crate::codec::{Decode, Encode};
383 use crate::error::AmqpCodecError;
384 use crate::protocol::Header;
385 use crate::types::Variant;
386
387 use super::Message;
388
389 #[test]
390 fn test_properties() -> Result<(), AmqpCodecError> {
391 let mut msg = Message::default();
392 msg.set_properties(|props| props.message_id = Some(1.into()));
393
394 let mut buf = BytesMut::with_capacity(msg.encoded_size());
395 msg.encode(&mut buf);
396
397 let msg2 = Message::decode(&mut buf.freeze())?;
398 let props = msg2.properties().unwrap();
399 assert_eq!(props.message_id, Some(1.into()));
400 Ok(())
401 }
402
403 #[test]
404 fn test_app_properties() -> Result<(), AmqpCodecError> {
405 let mut msg = Message::default();
406 msg.set_app_property(ByteString::from("test"), 1);
407
408 let mut buf = BytesMut::with_capacity(msg.encoded_size());
409 msg.encode(&mut buf);
410
411 let msg2 = Message::decode(&mut buf.freeze())?;
412 let props = msg2.app_properties().unwrap();
413 assert_eq!(props[0].0.as_str(), "test");
414 assert_eq!(props[0].1, Variant::from(1));
415 Ok(())
416 }
417
418 #[test]
419 fn test_header() -> Result<(), AmqpCodecError> {
420 let hdr = Header {
421 durable: false,
422 priority: 1,
423 ttl: None,
424 first_acquirer: false,
425 delivery_count: 1,
426 };
427
428 let mut msg = Message::default();
429 msg.set_header(hdr.clone());
430 let mut buf = BytesMut::with_capacity(msg.encoded_size());
431 msg.encode(&mut buf);
432
433 let msg2 = Message::decode(&mut buf.freeze())?;
434 assert_eq!(msg2.header().unwrap(), &hdr);
435 Ok(())
436 }
437
438 #[test]
439 fn test_data() -> Result<(), AmqpCodecError> {
440 let data = Bytes::from_static(b"test data");
441
442 let mut msg = Message::default();
443 msg.set_body(|body| body.set_data(data.clone()));
444 let mut buf = BytesMut::with_capacity(msg.encoded_size());
445 msg.encode(&mut buf);
446
447 let msg2 = Message::decode(&mut buf.freeze())?;
448 assert_eq!(msg2.body().data().unwrap(), &data);
449 Ok(())
450 }
451
452 #[test]
453 fn test_data_empty() -> Result<(), AmqpCodecError> {
454 let msg = Message::default();
455 let mut buf = BytesMut::with_capacity(msg.encoded_size());
456 msg.encode(&mut buf);
457 assert_eq!(buf, Bytes::from_static(b""));
458
459 let msg2 = Message::decode(&mut buf.freeze())?;
460 assert!(msg2.body().data().is_none());
461 Ok(())
462 }
463
464 #[test]
465 fn test_messages() -> Result<(), AmqpCodecError> {
466 let mut msg1 = Message::default();
467 msg1.set_properties(|props| props.message_id = Some(1.into()));
468 let mut msg2 = Message::default();
469 msg2.set_properties(|props| props.message_id = Some(2.into()));
470
471 let mut msg = Message::default();
472 msg.set_body(|body| {
473 body.messages.push(msg1.clone().into());
474 body.messages.push(msg2.clone().into());
475 });
476 let mut buf = BytesMut::with_capacity(msg.encoded_size());
477 msg.encode(&mut buf);
478
479 let msg3 = Message::decode(&mut buf.freeze())?;
480 let msg4 = Message::decode(&mut msg3.body().data().unwrap().clone())?;
481 assert_eq!(msg1.properties(), msg4.properties());
482
483 let msg5 = Message::decode(&mut msg3.body().data[1].clone())?;
484 assert_eq!(msg2.properties(), msg5.properties());
485 Ok(())
486 }
487
488 #[test]
489 fn test_messages_codec() -> Result<(), AmqpCodecError> {
490 let mut msg = Message::default();
491 msg.set_properties(|props| props.message_id = Some(Uuid::new_v4().into()));
492
493 let mut buf = BytesMut::with_capacity(msg.encoded_size());
494 msg.encode(&mut buf);
495
496 let msg2 = Message::decode(&mut buf.freeze())?;
497 assert_eq!(msg.properties(), msg2.properties());
498 Ok(())
499 }
500}