1use std::fmt;
21use std::ops::{Deref, DerefMut};
22
23pub use vld;
24
25pub struct LapinChannel {
27 inner: lapin::Channel,
28}
29
30impl LapinChannel {
31 pub fn new(inner: lapin::Channel) -> Self {
32 Self { inner }
33 }
34
35 pub fn into_inner(self) -> lapin::Channel {
36 self.inner
37 }
38
39 fn encode_payload<V>(value: &V) -> Result<Vec<u8>, VldLapinError>
40 where
41 V: serde::Serialize + vld::schema::VldParse,
42 {
43 let json =
44 serde_json::to_value(value).map_err(|e| VldLapinError::Serialization(e.to_string()))?;
45 <V as vld::schema::VldParse>::vld_parse_value(&json).map_err(VldLapinError::Validation)?;
46 serde_json::to_vec(&json).map_err(|e| VldLapinError::Serialization(e.to_string()))
47 }
48
49 pub fn decode_bytes<T>(&self, payload: &[u8]) -> Result<T, VldLapinError>
50 where
51 T: vld::schema::VldParse,
52 {
53 let value: serde_json::Value = serde_json::from_slice(payload)
54 .map_err(|e| VldLapinError::Deserialization(e.to_string()))?;
55 <T as vld::schema::VldParse>::vld_parse_value(&value).map_err(VldLapinError::Validation)
56 }
57
58 pub async fn queue_declare(
59 &self,
60 queue: &str,
61 options: lapin::options::QueueDeclareOptions,
62 arguments: lapin::types::FieldTable,
63 ) -> Result<lapin::Queue, VldLapinError> {
64 self.inner
65 .queue_declare(queue, options, arguments)
66 .await
67 .map_err(VldLapinError::Lapin)
68 }
69
70 pub async fn basic_get(
71 &self,
72 queue: &str,
73 options: lapin::options::BasicGetOptions,
74 ) -> Result<Option<lapin::message::BasicGetMessage>, VldLapinError> {
75 self.inner
76 .basic_get(queue, options)
77 .await
78 .map_err(VldLapinError::Lapin)
79 }
80
81 pub async fn publish<V>(
82 &self,
83 exchange: &str,
84 routing_key: &str,
85 options: lapin::options::BasicPublishOptions,
86 properties: lapin::BasicProperties,
87 value: &V,
88 ) -> Result<lapin::publisher_confirm::Confirmation, VldLapinError>
89 where
90 V: serde::Serialize + vld::schema::VldParse,
91 {
92 let payload = Self::encode_payload(value)?;
93
94 let confirm = self
95 .inner
96 .basic_publish(exchange, routing_key, options, &payload, properties)
97 .await
98 .map_err(VldLapinError::Lapin)?;
99 confirm.await.map_err(VldLapinError::Lapin)
100 }
101
102 pub fn decode_delivery<T>(
103 &self,
104 delivery: &lapin::message::Delivery,
105 ) -> Result<T, VldLapinError>
106 where
107 T: vld::schema::VldParse,
108 {
109 self.decode_bytes(&delivery.data)
110 }
111
112 pub fn decode_get<T>(
113 &self,
114 message: &lapin::message::BasicGetMessage,
115 ) -> Result<T, VldLapinError>
116 where
117 T: vld::schema::VldParse,
118 {
119 self.decode_delivery(message)
120 }
121
122 pub async fn ack_decode<T>(
123 &self,
124 delivery: &lapin::message::Delivery,
125 options: lapin::options::BasicAckOptions,
126 ) -> Result<T, VldLapinError>
127 where
128 T: vld::schema::VldParse,
129 {
130 let parsed = self.decode_delivery(delivery)?;
131 delivery.ack(options).await.map_err(VldLapinError::Lapin)?;
132 Ok(parsed)
133 }
134
135 pub async fn nack_decode<T>(
136 &self,
137 delivery: &lapin::message::Delivery,
138 options: lapin::options::BasicNackOptions,
139 ) -> Result<T, VldLapinError>
140 where
141 T: vld::schema::VldParse,
142 {
143 let parsed = self.decode_delivery(delivery)?;
144 delivery.nack(options).await.map_err(VldLapinError::Lapin)?;
145 Ok(parsed)
146 }
147
148 pub async fn reject_decode<T>(
149 &self,
150 delivery: &lapin::message::Delivery,
151 options: lapin::options::BasicRejectOptions,
152 ) -> Result<T, VldLapinError>
153 where
154 T: vld::schema::VldParse,
155 {
156 let parsed = self.decode_delivery(delivery)?;
157 delivery
158 .reject(options)
159 .await
160 .map_err(VldLapinError::Lapin)?;
161 Ok(parsed)
162 }
163
164 pub async fn ack_decode_get<T>(
165 &self,
166 message: &lapin::message::BasicGetMessage,
167 options: lapin::options::BasicAckOptions,
168 ) -> Result<T, VldLapinError>
169 where
170 T: vld::schema::VldParse,
171 {
172 self.ack_decode(message, options).await
173 }
174
175 pub async fn nack_decode_get<T>(
176 &self,
177 message: &lapin::message::BasicGetMessage,
178 options: lapin::options::BasicNackOptions,
179 ) -> Result<T, VldLapinError>
180 where
181 T: vld::schema::VldParse,
182 {
183 self.nack_decode(message, options).await
184 }
185
186 pub async fn reject_decode_get<T>(
187 &self,
188 message: &lapin::message::BasicGetMessage,
189 options: lapin::options::BasicRejectOptions,
190 ) -> Result<T, VldLapinError>
191 where
192 T: vld::schema::VldParse,
193 {
194 self.reject_decode(message, options).await
195 }
196}
197
198impl Deref for LapinChannel {
199 type Target = lapin::Channel;
200
201 fn deref(&self) -> &Self::Target {
202 &self.inner
203 }
204}
205
206impl DerefMut for LapinChannel {
207 fn deref_mut(&mut self) -> &mut Self::Target {
208 &mut self.inner
209 }
210}
211
212#[derive(Debug)]
214pub enum VldLapinError {
215 Validation(vld::error::VldError),
216 Serialization(String),
217 Deserialization(String),
218 Lapin(lapin::Error),
219}
220
221impl fmt::Display for VldLapinError {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 VldLapinError::Validation(e) => write!(f, "Validation error: {e}"),
225 VldLapinError::Serialization(e) => write!(f, "Serialization error: {e}"),
226 VldLapinError::Deserialization(e) => write!(f, "Deserialization error: {e}"),
227 VldLapinError::Lapin(e) => write!(f, "Lapin error: {e}"),
228 }
229 }
230}
231
232impl std::error::Error for VldLapinError {
233 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
234 match self {
235 VldLapinError::Validation(e) => Some(e),
236 VldLapinError::Lapin(e) => Some(e),
237 VldLapinError::Serialization(_) | VldLapinError::Deserialization(_) => None,
238 }
239 }
240}
241
242impl From<vld::error::VldError> for VldLapinError {
243 fn from(value: vld::error::VldError) -> Self {
244 Self::Validation(value)
245 }
246}
247
248impl From<lapin::Error> for VldLapinError {
249 fn from(value: lapin::Error) -> Self {
250 Self::Lapin(value)
251 }
252}
253
254#[macro_export]
255macro_rules! impl_to_lapin {
256 ($channel:ident) => {
257 let $channel = $crate::LapinChannel::new($channel);
258 };
259}
260
261pub mod prelude {
262 pub use crate::{impl_to_lapin, LapinChannel, VldLapinError};
263 pub use vld::prelude::*;
264}