1use std::cell::Cell;
2
3use bytes::{BufMut, Bytes, BytesMut};
4
5use crate::codec::{Decode, Encode, FORMATCODE_BINARY8};
6use crate::errors::AmqpParseError;
7use crate::protocol::{Annotations, Header, MessageFormat, Properties, Section, TransferBody};
8use crate::types::{Descriptor, Str, Symbol, Variant, VecStringMap, VecSymbolMap};
9
10use super::body::MessageBody;
11use super::inmessage::InMessage;
12use super::SECTION_PREFIX_LENGTH;
13
14#[derive(Debug, Clone, Default, PartialEq)]
15pub struct OutMessage {
16 pub message_format: Option<MessageFormat>,
17 header: Option<Header>,
18 delivery_annotations: Option<Annotations>,
19 message_annotations: Option<VecSymbolMap>,
20 properties: Option<Properties>,
21 application_properties: Option<VecStringMap>,
22 footer: Option<Annotations>,
23 body: MessageBody,
24 size: Cell<usize>,
25}
26
27impl OutMessage {
28 pub fn with_body(body: Bytes) -> OutMessage {
30 let mut msg = OutMessage::default();
31 msg.body.data.push(body);
32 msg.message_format = Some(0);
33 msg
34 }
35
36 pub fn with_messages(messages: Vec<TransferBody>) -> OutMessage {
38 let mut msg = OutMessage::default();
39 msg.body.messages = messages;
40 msg.message_format = Some(0);
41 msg
42 }
43
44 pub fn header(&self) -> Option<&Header> {
46 self.header.as_ref()
47 }
48
49 pub fn set_header(&mut self, header: Header) -> &mut Self {
51 self.header = Some(header);
52 self.size.set(0);
53 self
54 }
55
56 pub fn properties(&self) -> Option<&Properties> {
58 self.properties.as_ref()
59 }
60
61 pub fn properties_mut(&mut self) -> &mut Properties {
63 if self.properties.is_none() {
64 self.properties = Some(Properties::default());
65 }
66
67 self.size.set(0);
68 self.properties.as_mut().unwrap()
69 }
70
71 pub fn set_properties<F>(&mut self, f: F) -> &mut Self
73 where
74 F: Fn(&mut Properties),
75 {
76 if let Some(ref mut props) = self.properties {
77 f(props);
78 } else {
79 let mut props = Properties::default();
80 f(&mut props);
81 self.properties = Some(props);
82 }
83 self.size.set(0);
84 self
85 }
86
87 pub fn app_properties(&self) -> Option<&VecStringMap> {
89 self.application_properties.as_ref()
90 }
91
92 pub fn set_app_property<K, V>(&mut self, key: K, value: V) -> &mut Self
94 where
95 K: Into<Str>,
96 V: Into<Variant>,
97 {
98 if let Some(ref mut props) = self.application_properties {
99 props.push((key.into(), value.into()));
100 } else {
101 let mut props = VecStringMap::default();
102 props.push((key.into(), value.into()));
103 self.application_properties = Some(props);
104 }
105 self.size.set(0);
106 self
107 }
108
109 pub fn add_message_annotation<K, V>(&mut self, key: K, value: V) -> &mut Self
111 where
112 K: Into<Symbol>,
113 V: Into<Variant>,
114 {
115 if let Some(ref mut props) = self.message_annotations {
116 props.push((key.into(), value.into()));
117 } else {
118 let mut props = VecSymbolMap::default();
119 props.push((key.into(), value.into()));
120 self.message_annotations = Some(props);
121 }
122 self.size.set(0);
123 self
124 }
125
126 pub fn update<F>(self, f: F) -> Self
128 where
129 F: Fn(Self) -> Self,
130 {
131 self.size.set(0);
132 f(self)
133 }
134
135 pub fn if_some<T, F>(self, value: &Option<T>, f: F) -> Self
137 where
138 F: Fn(Self, &T) -> Self,
139 {
140 if let Some(ref val) = value {
141 self.size.set(0);
142 f(self, val)
143 } else {
144 self
145 }
146 }
147
148 pub fn body(&self) -> &MessageBody {
150 &self.body
151 }
152
153 pub fn value(&self) -> Option<&Variant> {
155 self.body.value.as_ref()
156 }
157
158 pub fn set_value<V: Into<Variant>>(&mut self, v: V) -> &mut Self {
160 self.body.value = Some(v.into());
161 self
162 }
163
164 pub fn set_body<F>(&mut self, f: F) -> &mut Self
166 where
167 F: Fn(&mut MessageBody),
168 {
169 f(&mut self.body);
170 self.size.set(0);
171 self
172 }
173
174 pub fn reply_message(&self) -> OutMessage {
176 OutMessage::default().if_some(&self.properties, |mut msg, data| {
177 msg.set_properties(|props| props.correlation_id = data.message_id.clone());
178 msg
179 })
180 }
181}
182
183impl From<InMessage> for OutMessage {
184 fn from(from: InMessage) -> Self {
185 let mut msg = OutMessage {
186 message_format: from.message_format,
187 header: from.header,
188 properties: from.properties,
189 delivery_annotations: from.delivery_annotations,
190 message_annotations: from.message_annotations.map(|v| v.into()),
191 application_properties: from.application_properties.map(|v| v.into()),
192 footer: from.footer,
193 body: from.body,
194 size: Cell::new(0),
195 };
196
197 if let Some(ref mut props) = msg.properties {
198 props.correlation_id = props.message_id.clone();
199 };
200
201 msg
202 }
203}
204
205impl Decode for OutMessage {
206 fn decode(mut input: &[u8]) -> Result<(&[u8], OutMessage), AmqpParseError> {
207 let mut message = OutMessage::default();
208
209 loop {
210 let (buf, sec) = Section::decode(input)?;
211 match sec {
212 Section::Header(val) => {
213 message.header = Some(val);
214 }
215 Section::DeliveryAnnotations(val) => {
216 message.delivery_annotations = Some(val);
217 }
218 Section::MessageAnnotations(val) => {
219 message.message_annotations = Some(VecSymbolMap(
220 val.into_iter().map(|(k, v)| (k.0.into(), v)).collect(),
221 ));
222 }
223 Section::ApplicationProperties(val) => {
224 message.application_properties = Some(VecStringMap(
225 val.into_iter().map(|(k, v)| (k.into(), v)).collect(),
226 ));
227 }
228 Section::Footer(val) => {
229 message.footer = Some(val);
230 }
231 Section::Properties(val) => {
232 message.properties = Some(val);
233 }
234
235 Section::AmqpSequence(val) => {
237 message.body.sequence.push(val);
238 }
239 Section::AmqpValue(val) => {
240 message.body.value = Some(val);
241 }
242 Section::Data(val) => {
243 message.body.data.push(val);
244 }
245 }
246 if buf.is_empty() {
247 break;
248 }
249 input = buf;
250 }
251 Ok((input, message))
252 }
253}
254
255impl Encode for OutMessage {
256 fn encoded_size(&self) -> usize {
257 let size = self.size.get();
258 if size != 0 {
259 return size;
260 }
261
262 let body_size = self.body.encoded_size();
264 let mut size = if body_size == 0 {
265 SECTION_PREFIX_LENGTH + 2
267 } else {
268 body_size
269 };
270
271 if let Some(ref h) = self.header {
272 size += h.encoded_size() + SECTION_PREFIX_LENGTH;
273 }
274 if let Some(ref da) = self.delivery_annotations {
275 size += da.encoded_size() + SECTION_PREFIX_LENGTH;
276 }
277 if let Some(ref ma) = self.message_annotations {
278 size += ma.encoded_size() + SECTION_PREFIX_LENGTH;
279 }
280 if let Some(ref p) = self.properties {
281 size += p.encoded_size();
282 }
283 if let Some(ref ap) = self.application_properties {
284 size += ap.encoded_size() + SECTION_PREFIX_LENGTH;
285 }
286 if let Some(ref f) = self.footer {
287 size += f.encoded_size() + SECTION_PREFIX_LENGTH;
288 }
289 self.size.set(size);
290 size
291 }
292
293 fn encode(&self, dst: &mut BytesMut) {
294 if let Some(ref h) = self.header {
295 h.encode(dst);
296 }
297 if let Some(ref da) = self.delivery_annotations {
298 Descriptor::Ulong(113).encode(dst);
299 da.encode(dst);
300 }
301 if let Some(ref ma) = self.message_annotations {
302 Descriptor::Ulong(114).encode(dst);
303 ma.encode(dst);
304 }
305 if let Some(ref p) = self.properties {
306 p.encode(dst);
307 }
308 if let Some(ref ap) = self.application_properties {
309 Descriptor::Ulong(116).encode(dst);
310 ap.encode(dst);
311 }
312
313 if self.body.encoded_size() == 0 {
315 Descriptor::Ulong(117).encode(dst);
317 dst.put_u8(FORMATCODE_BINARY8);
318 dst.put_u8(0);
319 } else {
320 self.body.encode(dst);
321 }
322
323 if let Some(ref f) = self.footer {
325 Descriptor::Ulong(120).encode(dst);
326 f.encode(dst);
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use bytes::{Bytes, BytesMut};
334 use bytestring::ByteString;
335
336 use crate::codec::{Decode, Encode};
337 use crate::errors::AmqpCodecError;
338 use crate::protocol::Header;
339 use crate::types::Variant;
340
341 use super::OutMessage;
342
343 #[test]
344 fn test_properties() -> Result<(), AmqpCodecError> {
345 let mut msg = OutMessage::default();
346 msg.set_properties(|props| props.message_id = Some(1.into()));
347
348 let mut buf = BytesMut::with_capacity(msg.encoded_size());
349 msg.encode(&mut buf);
350
351 let msg2 = OutMessage::decode(&buf)?.1;
352 let props = msg2.properties.as_ref().unwrap();
353 assert_eq!(props.message_id, Some(1.into()));
354 Ok(())
355 }
356
357 #[test]
358 fn test_app_properties() -> Result<(), AmqpCodecError> {
359 let mut msg = OutMessage::default();
360 msg.set_app_property(ByteString::from("test"), 1);
361
362 let mut buf = BytesMut::with_capacity(msg.encoded_size());
363 msg.encode(&mut buf);
364
365 let msg2 = OutMessage::decode(&buf)?.1;
366 let props = msg2.application_properties.as_ref().unwrap();
367 assert_eq!(props[0].0.as_str(), "test");
368 assert_eq!(props[0].1, Variant::from(1));
369 Ok(())
370 }
371
372 #[test]
373 fn test_header() -> Result<(), AmqpCodecError> {
374 let hdr = Header {
375 durable: false,
376 priority: 1,
377 ttl: None,
378 first_acquirer: false,
379 delivery_count: 1,
380 };
381
382 let mut msg = OutMessage::default();
383 msg.set_header(hdr.clone());
384 let mut buf = BytesMut::with_capacity(msg.encoded_size());
385 msg.encode(&mut buf);
386
387 let msg2 = OutMessage::decode(&buf)?.1;
388 assert_eq!(msg2.header().unwrap(), &hdr);
389 Ok(())
390 }
391
392 #[test]
393 fn test_data() -> Result<(), AmqpCodecError> {
394 let data = Bytes::from_static(b"test data");
395
396 let mut msg = OutMessage::default();
397 msg.set_body(|body| body.set_data(data.clone()));
398 let mut buf = BytesMut::with_capacity(msg.encoded_size());
399 msg.encode(&mut buf);
400
401 let msg2 = OutMessage::decode(&buf)?.1;
402 assert_eq!(msg2.body.data().unwrap(), &data);
403 Ok(())
404 }
405
406 #[test]
407 fn test_data_empty() -> Result<(), AmqpCodecError> {
408 let msg = OutMessage::default();
409 let mut buf = BytesMut::with_capacity(msg.encoded_size());
410 msg.encode(&mut buf);
411
412 let msg2 = OutMessage::decode(&buf)?.1;
413 assert_eq!(msg2.body.data().unwrap(), &Bytes::from_static(b""));
414 Ok(())
415 }
416
417 #[test]
418 fn test_messages() -> Result<(), AmqpCodecError> {
419 let mut msg1 = OutMessage::default();
420 msg1.set_properties(|props| props.message_id = Some(1.into()));
421 let mut msg2 = OutMessage::default();
422 msg2.set_properties(|props| props.message_id = Some(2.into()));
423
424 let mut msg = OutMessage::default();
425 msg.set_body(|body| {
426 body.messages.push(msg1.clone().into());
427 body.messages.push(msg2.clone().into());
428 });
429 let mut buf = BytesMut::with_capacity(msg.encoded_size());
430 msg.encode(&mut buf);
431
432 let msg3 = OutMessage::decode(&buf)?.1;
433 let msg4 = OutMessage::decode(&msg3.body.data().unwrap())?.1;
434 assert_eq!(msg1.properties, msg4.properties);
435
436 let msg5 = OutMessage::decode(&msg3.body.data[1])?.1;
437 assert_eq!(msg2.properties, msg5.properties);
438 Ok(())
439 }
440}