use std::convert::TryFrom;
use std::sync::Arc;
mod amqp;
mod builder;
use crate::codec::{Decoder, Encoder};
use crate::error::{DecodeError, EncodeError};
use crate::message::amqp::{AmqpDecoder, AmqpEncoder};
use amqp::AmqpMessage;
pub use self::amqp::{
AmqpDecodeError, AmqpEncodeError, AnnonationKey, Annotations, ApplicationProperties,
DeliveryAnnotations, Footer, Header, Map, MessageAnnotations, Properties, SimpleValue, Value,
};
use self::builder::MessageBuilder;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Message(Arc<InternalMessage>);
#[derive(Debug, PartialEq, Eq, Default)]
pub struct InternalMessage {
pub(crate) publishing_id: Option<u64>,
pub(crate) message: AmqpMessage,
}
impl Encoder for Message {
fn encoded_size(&self) -> u32 {
self.0.message.encoded_size()
}
fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> {
self.0.message.encode(writer)?;
Ok(())
}
}
impl Message {
#[cfg(test)]
pub(crate) fn new(internal: InternalMessage) -> Message {
Message(Arc::new(internal))
}
pub fn builder() -> MessageBuilder {
MessageBuilder(InternalMessage {
message: AmqpMessage::default(),
publishing_id: None,
})
}
pub fn value_ref<'a, T>(&'a self) -> Result<Option<T>, DecodeError>
where
T: TryFrom<&'a Value, Error = DecodeError>,
{
self.0
.message
.body()
.value()
.map(|value| T::try_from(value))
.transpose()
}
pub fn data(&self) -> Option<&[u8]> {
self.0.message.body().data().map(|data| data.as_slice())
}
pub fn properties(&self) -> Option<&Properties> {
self.0.message.properties()
}
pub fn header(&self) -> Option<&Header> {
self.0.message.header()
}
pub fn message_annotations(&self) -> Option<&MessageAnnotations> {
self.0.message.message_annotations()
}
pub fn application_properties(&self) -> Option<&ApplicationProperties> {
self.0.message.application_properties()
}
pub fn delivery_annotations(&self) -> Option<&DeliveryAnnotations> {
self.0.message.delivery_annotations()
}
pub fn publishing_id(&self) -> Option<&u64> {
self.0.publishing_id.as_ref()
}
}
impl Decoder for Message {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
AmqpMessage::decode(input)
.map(|(remaining, message)| {
(
remaining,
Message(Arc::new(InternalMessage {
publishing_id: None,
message,
})),
)
})
.map(Ok)?
}
}
impl From<Message> for Vec<Message> {
fn from(message: Message) -> Self {
vec![message]
}
}
impl From<AmqpEncodeError> for EncodeError {
fn from(err: AmqpEncodeError) -> Self {
match err {
AmqpEncodeError::Io(err) => EncodeError::Io(err),
}
}
}
impl From<AmqpDecodeError> for DecodeError {
fn from(err: AmqpDecodeError) -> Self {
match err {
AmqpDecodeError::InvalidTypeCode(code) => DecodeError::InvalidFormatCode(code),
AmqpDecodeError::MessageParse(err) => DecodeError::MessageParse(err),
AmqpDecodeError::Incomplete(err) => DecodeError::Incomplete(err),
AmqpDecodeError::Utf8Error(err) => DecodeError::Utf8Error(err),
AmqpDecodeError::UuidError(err) => DecodeError::MessageParse(err.to_string()),
AmqpDecodeError::InvalidTypeCodeFor { target, code } => {
DecodeError::MessageParse(format!("Invalid type code {:?} for {}", code, target))
}
}
}
}