1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::convert::TryFrom;
use std::sync::Arc;
mod amqp;
mod builder;
use crate::codec::{Decoder, Encoder};
use crate::error::{DecodeError, EncodeError};
use crate::message::amqp::{AmqpDecoder, AmqpEncoder};
use amqp::AmqpMessage;
pub use self::amqp::{
AmqpDecodeError, AmqpEncodeError, AnnonationKey, Annotations, ApplicationProperties,
DeliveryAnnotations, Footer, Header, Map, MessageAnnotations, Properties, SimpleValue, Value,
};
use self::builder::MessageBuilder;
#[derive(Debug, PartialEq, Clone)]
pub struct Message(Arc<InternalMessage>);
#[derive(Debug, PartialEq, Default)]
pub struct InternalMessage {
pub(crate) publishing_id: Option<u64>,
pub(crate) message: AmqpMessage,
}
impl Encoder for Message {
fn encoded_size(&self) -> u32 {
self.0.message.encoded_size()
}
fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> {
self.0.message.encode(writer)?;
Ok(())
}
}
impl Message {
#[cfg(test)]
pub(crate) fn new(internal: InternalMessage) -> Message {
Message(Arc::new(internal))
}
pub fn builder() -> MessageBuilder {
MessageBuilder(InternalMessage {
message: AmqpMessage::default(),
publishing_id: None,
})
}
pub fn value_ref<'a, T>(&'a self) -> Result<Option<T>, DecodeError>
where
T: TryFrom<&'a Value, Error = DecodeError>,
{
self.0
.message
.body()
.value()
.map(|value| T::try_from(value))
.transpose()
}
pub fn data(&self) -> Option<&[u8]> {
self.0.message.body().data().map(|data| data.as_slice())
}
pub fn properties(&self) -> Option<&Properties> {
self.0.message.properties()
}
pub fn header(&self) -> Option<&Header> {
self.0.message.header()
}
pub fn message_annotations(&self) -> Option<&MessageAnnotations> {
self.0.message.message_annotations()
}
pub fn application_properties(&self) -> Option<&ApplicationProperties> {
self.0.message.application_properties()
}
pub fn delivery_annotations(&self) -> Option<&DeliveryAnnotations> {
self.0.message.delivery_annotations()
}
pub fn publishing_id(&self) -> Option<&u64> {
self.0.publishing_id.as_ref()
}
}
impl Decoder for Message {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
AmqpMessage::decode(input)
.map(|(remaining, message)| {
(
remaining,
Message(Arc::new(InternalMessage {
publishing_id: None,
message,
})),
)
})
.map(Ok)?
}
}
impl From<Message> for Vec<Message> {
fn from(message: Message) -> Self {
vec![message]
}
}
impl From<AmqpEncodeError> for EncodeError {
fn from(err: AmqpEncodeError) -> Self {
match err {
AmqpEncodeError::Io(err) => EncodeError::Io(err),
}
}
}
impl From<AmqpDecodeError> for DecodeError {
fn from(err: AmqpDecodeError) -> Self {
match err {
AmqpDecodeError::InvalidTypeCode(code) => DecodeError::InvalidFormatCode(code),
AmqpDecodeError::MessageParse(err) => DecodeError::MessageParse(err),
AmqpDecodeError::Incomplete(err) => DecodeError::Incomplete(err),
AmqpDecodeError::Utf8Error(err) => DecodeError::Utf8Error(err),
AmqpDecodeError::UuidError(err) => DecodeError::MessageParse(err.to_string()),
AmqpDecodeError::InvalidTypeCodeFor { target, code } => {
DecodeError::MessageParse(format!("Invalid type code {:?} for {}", code, target))
}
}
}
}