1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// SPDX-License-Identifier: Apache-2.0 AND MIT
//! `Message` struct, `MessagePeek` and `MessageProcess` trait
use async_trait::async_trait;
use lapin;

/// A zero-cost [lapin::message::Delivery] [newtype].
///
/// [lapin::message::Delivery]: https://docs.rs/lapin/latest/lapin/message/struct.Delivery.html
/// [newtype]: https://doc.rust-lang.org/1.0.0/style/features/types/newtype.html
pub struct Message(lapin::message::Delivery);

/// Error actions used both by [MessagePeek] and [MessageProcess]
/// trait implementations.
///
/// [MessagePeek]: trait.MessagePeek.html
/// [MessageProcess]: trait.MessageProcess.html
pub enum MessageError {
    /// Silently drop a message.
    Drop,
    /// Reject a message.
    Reject,
    /// Nack a message.
    Nack,
}

impl Message {
    #[inline]
    pub fn new(delivery: lapin::message::Delivery) -> Self {
        Self(delivery)
    }
    #[inline]
    pub fn data(&self) -> &[u8] {
        &self.0.data
    }
    #[inline]
    pub fn delivery_tag(&self) -> u64 {
        self.0.delivery_tag
    }
    #[inline]
    pub fn reply_to(&self) -> Option<&str> {
        self.0
            .properties
            .reply_to()
            .as_ref()
            .map(|str| str.as_str())
    }
}

/// A trait to peek the [Message] and returns success or error.
///
/// [Message]: struct.Message.html
#[async_trait]
pub trait MessagePeek {
    /// Async method to peek a message.
    async fn peek(&mut self, msg: &Message) -> Result<(), MessageError>;
    fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync>;
}

// https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6
impl Clone for Box<dyn MessagePeek + Send + Sync> {
    fn clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
        self.boxed_clone()
    }
}

/// A trait to process the [Message] and returns the response data
/// or modified data.
///
/// [Message]: struct.Message.html
#[async_trait]
pub trait MessageProcess {
    /// Async method to process a message.
    async fn process(&mut self, msg: &Message) -> Result<Vec<u8>, MessageError>;
    fn boxed_clone(&self) -> Box<dyn MessageProcess + Send + Sync>;
}

// https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6
impl Clone for Box<dyn MessageProcess + Send + Sync> {
    fn clone(&self) -> Box<dyn MessageProcess + Send + Sync> {
        self.boxed_clone()
    }
}

/// A [MessagePeek] implementation which does nothing.
///
/// [MessagePeek]: trait.MessagePeek.html
#[derive(Clone)]
pub struct NoopPeeker;

#[async_trait]
impl MessagePeek for NoopPeeker {
    /// Echoe back the request message.
    async fn peek(&mut self, _msg: &Message) -> Result<(), MessageError> {
        Ok(())
    }
    fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
        Box::new((*self).clone())
    }
}

/// A [MessagePeek] implementation which reject a message.
///
/// [MessagePeek]: trait.MessagePeek.html
#[derive(Clone)]
struct RejectPeeker;

#[async_trait]
impl MessagePeek for RejectPeeker {
    /// Just returns the error saying to drop a message.
    /// to the console.  This is good for the benchmarking.
    async fn peek(&mut self, _msg: &Message) -> Result<(), MessageError> {
        Err(MessageError::Reject)
    }
    fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
        Box::new((*self).clone())
    }
}

/// A [MessageProcess] implementation which echoes back the original message.
///
/// [MessageProcess]: trait.MessageProcess.html
#[derive(Clone)]
pub struct EchoProcessor;

#[async_trait]
impl MessageProcess for EchoProcessor {
    /// Echoe back the request message.
    async fn process(&mut self, msg: &Message) -> Result<Vec<u8>, MessageError> {
        Ok(msg.data().to_vec())
    }
    fn boxed_clone(&self) -> Box<dyn MessageProcess + Send + Sync> {
        Box::new((*self).clone())
    }
}