1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::convert::TryFrom;
use std::sync::Arc;

mod amqp;
mod builder;

use crate::codec::{Decoder, Encoder};
use crate::error::{DecodeError, EncodeError};
use crate::message::amqp::{AmqpDecoder, AmqpEncoder};

use amqp::AmqpMessage;

pub use self::amqp::{
    AmqpDecodeError, AmqpEncodeError, AnnonationKey, Annotations, ApplicationProperties,
    DeliveryAnnotations, Footer, Header, Map, MessageAnnotations, Properties, SimpleValue, Value,
};

use self::builder::MessageBuilder;

/// API for inbound and outbound messages
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Message(Arc<InternalMessage>);

#[derive(Debug, PartialEq, Eq, Default)]
pub struct InternalMessage {
    pub(crate) publishing_id: Option<u64>,
    pub(crate) message: AmqpMessage,
}

impl Encoder for Message {
    fn encoded_size(&self) -> u32 {
        self.0.message.encoded_size()
    }

    fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> {
        self.0.message.encode(writer)?;
        Ok(())
    }
}

impl Message {
    #[cfg(test)]
    pub(crate) fn new(internal: InternalMessage) -> Message {
        Message(Arc::new(internal))
    }

    /// Builder for creating [`Message`]
    pub fn builder() -> MessageBuilder {
        MessageBuilder(InternalMessage {
            message: AmqpMessage::default(),
            publishing_id: None,
        })
    }

    /// Extract a value as reference from the `amqp-value` section of the body if present
    pub fn value_ref<'a, T>(&'a self) -> Result<Option<T>, DecodeError>
    where
        T: TryFrom<&'a Value, Error = DecodeError>,
    {
        self.0
            .message
            .body()
            .value()
            .map(|value| T::try_from(value))
            .transpose()
    }

    /// Get the data associated to the message if any
    pub fn data(&self) -> Option<&[u8]> {
        self.0.message.body().data().map(|data| data.as_slice())
    }

    /// Get the properties of the message
    pub fn properties(&self) -> Option<&Properties> {
        self.0.message.properties()
    }
    /// Get the header of the message
    pub fn header(&self) -> Option<&Header> {
        self.0.message.header()
    }

    /// Get the annotations of the message
    pub fn message_annotations(&self) -> Option<&MessageAnnotations> {
        self.0.message.message_annotations()
    }
    /// Get the application properties of the message
    pub fn application_properties(&self) -> Option<&ApplicationProperties> {
        self.0.message.application_properties()
    }

    /// Get the delivery annotations of the message
    pub fn delivery_annotations(&self) -> Option<&DeliveryAnnotations> {
        self.0.message.delivery_annotations()
    }

    /// Get a reference to the message's publishing id.
    pub fn publishing_id(&self) -> Option<&u64> {
        self.0.publishing_id.as_ref()
    }
}

impl Decoder for Message {
    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
        AmqpMessage::decode(input)
            .map(|(remaining, message)| {
                (
                    remaining,
                    Message(Arc::new(InternalMessage {
                        publishing_id: None,
                        message,
                    })),
                )
            })
            .map(Ok)?
    }
}

impl From<Message> for Vec<Message> {
    fn from(message: Message) -> Self {
        vec![message]
    }
}

impl From<AmqpEncodeError> for EncodeError {
    fn from(err: AmqpEncodeError) -> Self {
        match err {
            AmqpEncodeError::Io(err) => EncodeError::Io(err),
        }
    }
}

impl From<AmqpDecodeError> for DecodeError {
    fn from(err: AmqpDecodeError) -> Self {
        match err {
            AmqpDecodeError::InvalidTypeCode(code) => DecodeError::InvalidFormatCode(code),
            AmqpDecodeError::MessageParse(err) => DecodeError::MessageParse(err),
            AmqpDecodeError::Incomplete(err) => DecodeError::Incomplete(err),
            AmqpDecodeError::Utf8Error(err) => DecodeError::Utf8Error(err),
            AmqpDecodeError::UuidError(err) => DecodeError::MessageParse(err.to_string()),
            AmqpDecodeError::InvalidTypeCodeFor { target, code } => {
                DecodeError::MessageParse(format!("Invalid type code {:?} for {}", code, target))
            }
        }
    }
}