ntex-amqp-codec 0.9.0

AMQP 1.0 Protocol Codec
Documentation
use std::fmt;

use chrono::{DateTime, Utc};
use derive_more::From;
use ntex_bytes::{BufMut, ByteString, Bytes, BytesMut};
use uuid::Uuid;

use super::codec::{self, Decode, DecodeFormatted, Encode};
use crate::{error::AmqpParseError, message::Message, types::*, HashMap};

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

#[derive(Debug)]
pub(crate) struct CompoundHeader {
    pub size: u32,
    pub count: u32,
}

impl CompoundHeader {
    pub fn empty() -> CompoundHeader {
        CompoundHeader { size: 0, count: 0 }
    }
}

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum ProtocolId {
    Amqp = 0,
    AmqpTls = 2,
    AmqpSasl = 3,
}

pub type Map = HashMap<Variant, Variant>;
pub type StringVariantMap = HashMap<Str, Variant>;
pub type Fields = HashMap<Symbol, Variant>;
pub type FilterSet = HashMap<Symbol, Option<ByteString>>;
pub type FieldsVec = VecSymbolMap;
pub type Timestamp = DateTime<Utc>;
pub type Symbols = Multiple<Symbol>;
pub type IetfLanguageTags = Multiple<IetfLanguageTag>;
pub type Annotations = HashMap<Symbol, Variant>;

#[allow(
    clippy::unreadable_literal,
    clippy::match_bool,
    clippy::large_enum_variant
)]
#[cfg(not(tarpaulin_include))]
mod definitions;
#[cfg(not(tarpaulin_include))]
pub use self::definitions::*;

#[derive(Debug, Eq, PartialEq, Clone, From, Display)]
pub enum MessageId {
    #[display(fmt = "{}", _0)]
    Ulong(u64),
    #[display(fmt = "{}", _0)]
    Uuid(Uuid),
    #[display(fmt = "{:?}", _0)]
    Binary(Bytes),
    #[display(fmt = "{}", _0)]
    String(ByteString),
}

impl From<usize> for MessageId {
    fn from(id: usize) -> MessageId {
        MessageId::Ulong(id as u64)
    }
}

impl From<i32> for MessageId {
    fn from(id: i32) -> MessageId {
        MessageId::Ulong(id as u64)
    }
}

impl DecodeFormatted for MessageId {
    fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
        match fmt {
            codec::FORMATCODE_SMALLULONG | codec::FORMATCODE_ULONG | codec::FORMATCODE_ULONG_0 => {
                u64::decode_with_format(input, fmt).map(MessageId::Ulong)
            }
            codec::FORMATCODE_UUID => Uuid::decode_with_format(input, fmt).map(MessageId::Uuid),
            codec::FORMATCODE_BINARY8 | codec::FORMATCODE_BINARY32 => {
                Bytes::decode_with_format(input, fmt).map(MessageId::Binary)
            }
            codec::FORMATCODE_STRING8 | codec::FORMATCODE_STRING32 => {
                ByteString::decode_with_format(input, fmt).map(MessageId::String)
            }
            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
        }
    }
}

impl Encode for MessageId {
    fn encoded_size(&self) -> usize {
        match *self {
            MessageId::Ulong(v) => v.encoded_size(),
            MessageId::Uuid(ref v) => v.encoded_size(),
            MessageId::Binary(ref v) => v.encoded_size(),
            MessageId::String(ref v) => v.encoded_size(),
        }
    }

    fn encode(&self, buf: &mut BytesMut) {
        match *self {
            MessageId::Ulong(v) => v.encode(buf),
            MessageId::Uuid(ref v) => v.encode(buf),
            MessageId::Binary(ref v) => v.encode(buf),
            MessageId::String(ref v) => v.encode(buf),
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq, From)]
pub enum ErrorCondition {
    AmqpError(AmqpError),
    ConnectionError(ConnectionError),
    SessionError(SessionError),
    LinkError(LinkError),
    Custom(Symbol),
}

impl Default for ErrorCondition {
    fn default() -> ErrorCondition {
        ErrorCondition::Custom(Symbol(Str::from("Unknown")))
    }
}

impl DecodeFormatted for ErrorCondition {
    #[inline]
    fn decode_with_format(input: &mut Bytes, format: u8) -> Result<Self, AmqpParseError> {
        let result = Symbol::decode_with_format(input, format)?;
        if let Ok(r) = AmqpError::try_from(&result) {
            return Ok(ErrorCondition::AmqpError(r));
        }
        if let Ok(r) = ConnectionError::try_from(&result) {
            return Ok(ErrorCondition::ConnectionError(r));
        }
        if let Ok(r) = SessionError::try_from(&result) {
            return Ok(ErrorCondition::SessionError(r));
        }
        if let Ok(r) = LinkError::try_from(&result) {
            return Ok(ErrorCondition::LinkError(r));
        }
        Ok(ErrorCondition::Custom(result))
    }
}

impl Encode for ErrorCondition {
    fn encoded_size(&self) -> usize {
        match *self {
            ErrorCondition::AmqpError(ref v) => v.encoded_size(),
            ErrorCondition::ConnectionError(ref v) => v.encoded_size(),
            ErrorCondition::SessionError(ref v) => v.encoded_size(),
            ErrorCondition::LinkError(ref v) => v.encoded_size(),
            ErrorCondition::Custom(ref v) => v.encoded_size(),
        }
    }

    fn encode(&self, buf: &mut BytesMut) {
        match *self {
            ErrorCondition::AmqpError(ref v) => v.encode(buf),
            ErrorCondition::ConnectionError(ref v) => v.encode(buf),
            ErrorCondition::SessionError(ref v) => v.encode(buf),
            ErrorCondition::LinkError(ref v) => v.encode(buf),
            ErrorCondition::Custom(ref v) => v.encode(buf),
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DistributionMode {
    Move,
    Copy,
    Custom(Symbol),
}

impl DecodeFormatted for DistributionMode {
    fn decode_with_format(input: &mut Bytes, format: u8) -> Result<Self, AmqpParseError> {
        let result = Symbol::decode_with_format(input, format)?;
        let result = match result.as_str() {
            "move" => DistributionMode::Move,
            "copy" => DistributionMode::Copy,
            _ => DistributionMode::Custom(result),
        };
        Ok(result)
    }
}

impl Encode for DistributionMode {
    fn encoded_size(&self) -> usize {
        match *self {
            DistributionMode::Move => 6,
            DistributionMode::Copy => 6,
            DistributionMode::Custom(ref v) => v.encoded_size(),
        }
    }

    fn encode(&self, buf: &mut BytesMut) {
        match *self {
            DistributionMode::Move => Symbol::from("move").encode(buf),
            DistributionMode::Copy => Symbol::from("copy").encode(buf),
            DistributionMode::Custom(ref v) => v.encode(buf),
        }
    }
}

impl SaslInit {
    pub fn prepare_response(authz_id: &str, authn_id: &str, password: &str) -> Bytes {
        Bytes::from(format!("{}\x00{}\x00{}", authz_id, authn_id, password))
    }
}

#[allow(clippy::derivable_impls)]
impl Default for Properties {
    fn default() -> Properties {
        Properties {
            message_id: None,
            user_id: None,
            to: None,
            subject: None,
            reply_to: None,
            correlation_id: None,
            content_type: None,
            content_encoding: None,
            absolute_expiry_time: None,
            creation_time: None,
            group_id: None,
            group_sequence: None,
            reply_to_group_id: None,
        }
    }
}

#[derive(Debug, Clone, From, PartialEq, Eq)]
pub enum TransferBody {
    Data(Bytes),
    Message(Message),
}

impl TransferBody {
    #[inline]
    pub fn len(&self) -> usize {
        self.encoded_size()
    }

    #[inline]
    pub fn message_format(&self) -> Option<MessageFormat> {
        match self {
            TransferBody::Data(_) => None,
            TransferBody::Message(ref data) => data.0.message_format,
        }
    }
}

impl Encode for TransferBody {
    #[inline]
    fn encoded_size(&self) -> usize {
        match self {
            TransferBody::Data(ref data) => data.len(),
            TransferBody::Message(ref data) => data.encoded_size(),
        }
    }
    #[inline]
    fn encode(&self, dst: &mut BytesMut) {
        match *self {
            TransferBody::Data(ref data) => dst.put_slice(data),
            TransferBody::Message(ref data) => data.encode(dst),
        }
    }
}

impl Transfer {
    #[inline]
    pub fn get_body(&self) -> Option<&Bytes> {
        match self.body() {
            Some(TransferBody::Data(ref b)) => Some(b),
            _ => None,
        }
    }

    #[inline]
    pub fn load_message<T: Decode>(&self) -> Result<T, AmqpParseError> {
        if let Some(TransferBody::Data(ref b)) = self.body() {
            Ok(T::decode(&mut b.clone())?)
        } else {
            Err(AmqpParseError::UnexpectedType("body"))
        }
    }
}

impl Default for Role {
    fn default() -> Role {
        Role::Sender
    }
}

impl Default for SenderSettleMode {
    fn default() -> SenderSettleMode {
        SenderSettleMode::Mixed
    }
}

impl Default for ReceiverSettleMode {
    fn default() -> ReceiverSettleMode {
        ReceiverSettleMode::First
    }
}

#[cfg(test)]
mod tests {
    use ntex_bytes::BytesMut;
    use uuid::Uuid;

    use super::*;
    use crate::codec::{Decode, Encode};
    use crate::error::AmqpCodecError;

    #[test]
    fn test_message_id() -> Result<(), AmqpCodecError> {
        let id = MessageId::Uuid(Uuid::new_v4());

        let mut buf = BytesMut::new();
        buf.reserve(id.encoded_size());
        id.encode(&mut buf);

        let new_id = MessageId::decode(&mut buf.freeze())?;
        assert_eq!(id, new_id);
        Ok(())
    }

    #[test]
    fn test_properties() -> Result<(), AmqpCodecError> {
        let id = Uuid::new_v4();
        let mut props = Properties::default();
        props.correlation_id = Some(id.into());

        let mut buf = BytesMut::new();
        buf.reserve(id.encoded_size());
        props.encode(&mut buf);

        let props2 = Properties::decode(&mut buf.freeze())?;
        assert_eq!(props, props2);
        Ok(())
    }
}