rabbitmq_stream_protocol/message/
mod.rs

1use std::convert::TryFrom;
2use std::sync::Arc;
3
4mod amqp;
5mod builder;
6
7use crate::codec::{Decoder, Encoder};
8use crate::error::{DecodeError, EncodeError};
9use crate::message::amqp::{AmqpDecoder, AmqpEncoder};
10
11use amqp::AmqpMessage;
12
13pub use self::amqp::{
14    AmqpDecodeError, AmqpEncodeError, AnnonationKey, Annotations, ApplicationProperties,
15    DeliveryAnnotations, Footer, Header, Map, MessageAnnotations, Properties, SimpleValue, Value,
16};
17
18use self::builder::MessageBuilder;
19
20/// API for inbound and outbound messages
21#[derive(Debug, PartialEq, Eq, Clone)]
22pub struct Message(Arc<InternalMessage>);
23
24#[derive(Debug, PartialEq, Eq, Default)]
25pub struct InternalMessage {
26    pub(crate) publishing_id: Option<u64>,
27    pub(crate) message: AmqpMessage,
28}
29
30impl Encoder for Message {
31    fn encoded_size(&self) -> u32 {
32        self.0.message.encoded_size()
33    }
34
35    fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> {
36        self.0.message.encode(writer)?;
37        Ok(())
38    }
39}
40
41impl Message {
42    #[cfg(test)]
43    pub(crate) fn new(internal: InternalMessage) -> Message {
44        Message(Arc::new(internal))
45    }
46
47    /// Builder for creating [`Message`]
48    pub fn builder() -> MessageBuilder {
49        MessageBuilder(InternalMessage {
50            message: AmqpMessage::default(),
51            publishing_id: None,
52        })
53    }
54
55    /// Extract a value as reference from the `amqp-value` section of the body if present
56    pub fn value_ref<'a, T>(&'a self) -> Result<Option<T>, DecodeError>
57    where
58        T: TryFrom<&'a Value, Error = DecodeError>,
59    {
60        self.0
61            .message
62            .body()
63            .value()
64            .map(|value| T::try_from(value))
65            .transpose()
66    }
67
68    /// Get the data associated to the message if any
69    pub fn data(&self) -> Option<&[u8]> {
70        self.0.message.body().data().map(|data| data.as_slice())
71    }
72
73    /// Get the properties of the message
74    pub fn properties(&self) -> Option<&Properties> {
75        self.0.message.properties()
76    }
77    /// Get the header of the message
78    pub fn header(&self) -> Option<&Header> {
79        self.0.message.header()
80    }
81
82    /// Get the annotations of the message
83    pub fn message_annotations(&self) -> Option<&MessageAnnotations> {
84        self.0.message.message_annotations()
85    }
86    /// Get the application properties of the message
87    pub fn application_properties(&self) -> Option<&ApplicationProperties> {
88        self.0.message.application_properties()
89    }
90
91    /// Get the delivery annotations of the message
92    pub fn delivery_annotations(&self) -> Option<&DeliveryAnnotations> {
93        self.0.message.delivery_annotations()
94    }
95
96    /// Get a reference to the message's publishing id.
97    pub fn publishing_id(&self) -> Option<&u64> {
98        self.0.publishing_id.as_ref()
99    }
100}
101
102impl Decoder for Message {
103    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
104        AmqpMessage::decode(input)
105            .map(|(remaining, message)| {
106                (
107                    remaining,
108                    Message(Arc::new(InternalMessage {
109                        publishing_id: None,
110                        message,
111                    })),
112                )
113            })
114            .map(Ok)?
115    }
116}
117
118impl From<Message> for Vec<Message> {
119    fn from(message: Message) -> Self {
120        vec![message]
121    }
122}
123
124impl From<AmqpEncodeError> for EncodeError {
125    fn from(err: AmqpEncodeError) -> Self {
126        match err {
127            AmqpEncodeError::Io(err) => EncodeError::Io(err),
128        }
129    }
130}
131
132impl From<AmqpDecodeError> for DecodeError {
133    fn from(err: AmqpDecodeError) -> Self {
134        match err {
135            AmqpDecodeError::InvalidTypeCode(code) => DecodeError::InvalidFormatCode(code),
136            AmqpDecodeError::MessageParse(err) => DecodeError::MessageParse(err),
137            AmqpDecodeError::Incomplete(err) => DecodeError::Incomplete(err),
138            AmqpDecodeError::Utf8Error(err) => DecodeError::Utf8Error(err),
139            AmqpDecodeError::UuidError(err) => DecodeError::MessageParse(err.to_string()),
140            AmqpDecodeError::InvalidTypeCodeFor { target, code } => {
141                DecodeError::MessageParse(format!("Invalid type code {:?} for {}", code, target))
142            }
143        }
144    }
145}