async_mq/
message.rs

1// SPDX-License-Identifier: Apache-2.0 AND MIT
2//! `Message` struct, `MessagePeek` and `MessageProcess` trait
3use async_trait::async_trait;
4
5/// A zero-cost [lapin::message::Delivery] [newtype].
6///
7/// [lapin::message::Delivery]: https://docs.rs/lapin/latest/lapin/message/struct.Delivery.html
8/// [newtype]: https://doc.rust-lang.org/1.0.0/style/features/types/newtype.html
9pub struct Message(lapin::message::Delivery);
10
11/// Error actions used both by [MessagePeek] and [MessageProcess]
12/// trait implementations.
13///
14/// [MessagePeek]: trait.MessagePeek.html
15/// [MessageProcess]: trait.MessageProcess.html
16pub enum MessageError {
17    /// Silently drop a message.
18    Drop,
19    /// Reject a message.
20    Reject,
21    /// Nack a message.
22    Nack,
23}
24
25impl Message {
26    #[inline]
27    pub fn new(delivery: lapin::message::Delivery) -> Self {
28        Self(delivery)
29    }
30    #[inline]
31    pub fn data(&self) -> &[u8] {
32        &self.0.data
33    }
34    #[inline]
35    pub fn delivery_tag(&self) -> u64 {
36        self.0.delivery_tag
37    }
38    #[inline]
39    pub fn reply_to(&self) -> Option<&str> {
40        self.0
41            .properties
42            .reply_to()
43            .as_ref()
44            .map(|str| str.as_str())
45    }
46}
47
48/// A trait to peek the [Message] and returns success or error.
49///
50/// [Message]: struct.Message.html
51#[async_trait]
52pub trait MessagePeek {
53    /// Async method to peek a message.
54    async fn peek(&mut self, msg: &Message) -> Result<(), MessageError>;
55    fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync>;
56}
57
58// https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6
59impl Clone for Box<dyn MessagePeek + Send + Sync> {
60    fn clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
61        self.boxed_clone()
62    }
63}
64
65/// A trait to process the [Message] and returns the response data
66/// or modified data.
67///
68/// [Message]: struct.Message.html
69#[async_trait]
70pub trait MessageProcess {
71    /// Async method to process a message.
72    async fn process(&mut self, msg: &Message) -> Result<Vec<u8>, MessageError>;
73    fn boxed_clone(&self) -> Box<dyn MessageProcess + Send + Sync>;
74}
75
76// https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6
77impl Clone for Box<dyn MessageProcess + Send + Sync> {
78    fn clone(&self) -> Box<dyn MessageProcess + Send + Sync> {
79        self.boxed_clone()
80    }
81}
82
83/// A [MessagePeek] implementation which does nothing.
84///
85/// [MessagePeek]: trait.MessagePeek.html
86#[derive(Clone)]
87pub struct NoopPeeker;
88
89#[async_trait]
90impl MessagePeek for NoopPeeker {
91    /// Echoe back the request message.
92    async fn peek(&mut self, _msg: &Message) -> Result<(), MessageError> {
93        Ok(())
94    }
95    fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
96        Box::new((*self).clone())
97    }
98}
99
100/// A [MessagePeek] implementation which reject a message.
101///
102/// [MessagePeek]: trait.MessagePeek.html
103#[derive(Clone)]
104struct RejectPeeker;
105
106#[async_trait]
107impl MessagePeek for RejectPeeker {
108    /// Just returns the error saying to drop a message.
109    /// to the console.  This is good for the benchmarking.
110    async fn peek(&mut self, _msg: &Message) -> Result<(), MessageError> {
111        Err(MessageError::Reject)
112    }
113    fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
114        Box::new((*self).clone())
115    }
116}
117
118/// A [MessageProcess] implementation which echoes back the original message.
119///
120/// [MessageProcess]: trait.MessageProcess.html
121#[derive(Clone)]
122pub struct EchoProcessor;
123
124#[async_trait]
125impl MessageProcess for EchoProcessor {
126    /// Echoe back the request message.
127    async fn process(&mut self, msg: &Message) -> Result<Vec<u8>, MessageError> {
128        Ok(msg.data().to_vec())
129    }
130    fn boxed_clone(&self) -> Box<dyn MessageProcess + Send + Sync> {
131        Box::new((*self).clone())
132    }
133}