1use async_trait::async_trait;
4
5pub struct Message(lapin::message::Delivery);
10
11pub enum MessageError {
17 Drop,
19 Reject,
21 Nack,
23}
24
25impl Message {
26 #[inline]
27 pub fn new(delivery: lapin::message::Delivery) -> Self {
28 Self(delivery)
29 }
30 #[inline]
31 pub fn data(&self) -> &[u8] {
32 &self.0.data
33 }
34 #[inline]
35 pub fn delivery_tag(&self) -> u64 {
36 self.0.delivery_tag
37 }
38 #[inline]
39 pub fn reply_to(&self) -> Option<&str> {
40 self.0
41 .properties
42 .reply_to()
43 .as_ref()
44 .map(|str| str.as_str())
45 }
46}
47
48#[async_trait]
52pub trait MessagePeek {
53 async fn peek(&mut self, msg: &Message) -> Result<(), MessageError>;
55 fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync>;
56}
57
58impl Clone for Box<dyn MessagePeek + Send + Sync> {
60 fn clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
61 self.boxed_clone()
62 }
63}
64
65#[async_trait]
70pub trait MessageProcess {
71 async fn process(&mut self, msg: &Message) -> Result<Vec<u8>, MessageError>;
73 fn boxed_clone(&self) -> Box<dyn MessageProcess + Send + Sync>;
74}
75
76impl Clone for Box<dyn MessageProcess + Send + Sync> {
78 fn clone(&self) -> Box<dyn MessageProcess + Send + Sync> {
79 self.boxed_clone()
80 }
81}
82
83#[derive(Clone)]
87pub struct NoopPeeker;
88
89#[async_trait]
90impl MessagePeek for NoopPeeker {
91 async fn peek(&mut self, _msg: &Message) -> Result<(), MessageError> {
93 Ok(())
94 }
95 fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
96 Box::new((*self).clone())
97 }
98}
99
100#[derive(Clone)]
104struct RejectPeeker;
105
106#[async_trait]
107impl MessagePeek for RejectPeeker {
108 async fn peek(&mut self, _msg: &Message) -> Result<(), MessageError> {
111 Err(MessageError::Reject)
112 }
113 fn boxed_clone(&self) -> Box<dyn MessagePeek + Send + Sync> {
114 Box::new((*self).clone())
115 }
116}
117
118#[derive(Clone)]
122pub struct EchoProcessor;
123
124#[async_trait]
125impl MessageProcess for EchoProcessor {
126 async fn process(&mut self, msg: &Message) -> Result<Vec<u8>, MessageError> {
128 Ok(msg.data().to_vec())
129 }
130 fn boxed_clone(&self) -> Box<dyn MessageProcess + Send + Sync> {
131 Box::new((*self).clone())
132 }
133}