rabbitmq_stream_protocol/message/
mod.rs1use std::convert::TryFrom;
2use std::sync::Arc;
3
4mod amqp;
5mod builder;
6
7use crate::codec::{Decoder, Encoder};
8use crate::error::{DecodeError, EncodeError};
9use crate::message::amqp::{AmqpDecoder, AmqpEncoder};
10
11use amqp::AmqpMessage;
12
13pub use self::amqp::{
14 AmqpDecodeError, AmqpEncodeError, AnnonationKey, Annotations, ApplicationProperties,
15 DeliveryAnnotations, Footer, Header, Map, MessageAnnotations, Properties, SimpleValue, Value,
16};
17
18use self::builder::MessageBuilder;
19
20#[derive(Debug, PartialEq, Eq, Clone)]
22pub struct Message(Arc<InternalMessage>);
23
24#[derive(Debug, PartialEq, Eq, Default)]
25pub struct InternalMessage {
26 pub(crate) publishing_id: Option<u64>,
27 pub(crate) message: AmqpMessage,
28}
29
30impl Encoder for Message {
31 fn encoded_size(&self) -> u32 {
32 self.0.message.encoded_size()
33 }
34
35 fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> {
36 self.0.message.encode(writer)?;
37 Ok(())
38 }
39}
40
41impl Message {
42 #[cfg(test)]
43 pub(crate) fn new(internal: InternalMessage) -> Message {
44 Message(Arc::new(internal))
45 }
46
47 pub fn builder() -> MessageBuilder {
49 MessageBuilder(InternalMessage {
50 message: AmqpMessage::default(),
51 publishing_id: None,
52 })
53 }
54
55 pub fn value_ref<'a, T>(&'a self) -> Result<Option<T>, DecodeError>
57 where
58 T: TryFrom<&'a Value, Error = DecodeError>,
59 {
60 self.0
61 .message
62 .body()
63 .value()
64 .map(|value| T::try_from(value))
65 .transpose()
66 }
67
68 pub fn data(&self) -> Option<&[u8]> {
70 self.0.message.body().data().map(|data| data.as_slice())
71 }
72
73 pub fn properties(&self) -> Option<&Properties> {
75 self.0.message.properties()
76 }
77 pub fn header(&self) -> Option<&Header> {
79 self.0.message.header()
80 }
81
82 pub fn message_annotations(&self) -> Option<&MessageAnnotations> {
84 self.0.message.message_annotations()
85 }
86 pub fn application_properties(&self) -> Option<&ApplicationProperties> {
88 self.0.message.application_properties()
89 }
90
91 pub fn delivery_annotations(&self) -> Option<&DeliveryAnnotations> {
93 self.0.message.delivery_annotations()
94 }
95
96 pub fn publishing_id(&self) -> Option<&u64> {
98 self.0.publishing_id.as_ref()
99 }
100}
101
102impl Decoder for Message {
103 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
104 AmqpMessage::decode(input)
105 .map(|(remaining, message)| {
106 (
107 remaining,
108 Message(Arc::new(InternalMessage {
109 publishing_id: None,
110 message,
111 })),
112 )
113 })
114 .map(Ok)?
115 }
116}
117
118impl From<Message> for Vec<Message> {
119 fn from(message: Message) -> Self {
120 vec![message]
121 }
122}
123
124impl From<AmqpEncodeError> for EncodeError {
125 fn from(err: AmqpEncodeError) -> Self {
126 match err {
127 AmqpEncodeError::Io(err) => EncodeError::Io(err),
128 }
129 }
130}
131
132impl From<AmqpDecodeError> for DecodeError {
133 fn from(err: AmqpDecodeError) -> Self {
134 match err {
135 AmqpDecodeError::InvalidTypeCode(code) => DecodeError::InvalidFormatCode(code),
136 AmqpDecodeError::MessageParse(err) => DecodeError::MessageParse(err),
137 AmqpDecodeError::Incomplete(err) => DecodeError::Incomplete(err),
138 AmqpDecodeError::Utf8Error(err) => DecodeError::Utf8Error(err),
139 AmqpDecodeError::UuidError(err) => DecodeError::MessageParse(err.to_string()),
140 AmqpDecodeError::InvalidTypeCodeFor { target, code } => {
141 DecodeError::MessageParse(format!("Invalid type code {:?} for {}", code, target))
142 }
143 }
144 }
145}