Skip to main content

vld_lapin/
lib.rs

1//! # vld-lapin
2//!
3//! Lapin (RabbitMQ) integration for `vld`.
4//!
5//! ## Overview
6//!
7//! `vld-lapin` keeps one entrypoint macro:
8//!
9//! - `impl_to_lapin!(channel)`
10//!
11//! After rebinding, `channel` becomes a validating wrapper with:
12//!
13//! - publish helper: `publish`
14//! - get/decode helpers: `basic_get`, `decode_bytes`, `decode_delivery`, `decode_get`
15//! - ack helpers: `ack_decode`, `nack_decode`, `reject_decode`
16//! - get+ack helpers: `ack_decode_get`, `nack_decode_get`, `reject_decode_get`
17//!
18//! All other native `lapin::Channel` methods are still available through deref.
19
20use std::fmt;
21use std::ops::{Deref, DerefMut};
22
23pub use vld;
24
25/// Lapin channel wrapper with validate+JSON behavior.
26pub struct LapinChannel {
27    inner: lapin::Channel,
28}
29
30impl LapinChannel {
31    pub fn new(inner: lapin::Channel) -> Self {
32        Self { inner }
33    }
34
35    pub fn into_inner(self) -> lapin::Channel {
36        self.inner
37    }
38
39    fn encode_payload<V>(value: &V) -> Result<Vec<u8>, VldLapinError>
40    where
41        V: serde::Serialize + vld::schema::VldParse,
42    {
43        let json =
44            serde_json::to_value(value).map_err(|e| VldLapinError::Serialization(e.to_string()))?;
45        <V as vld::schema::VldParse>::vld_parse_value(&json).map_err(VldLapinError::Validation)?;
46        serde_json::to_vec(&json).map_err(|e| VldLapinError::Serialization(e.to_string()))
47    }
48
49    pub fn decode_bytes<T>(&self, payload: &[u8]) -> Result<T, VldLapinError>
50    where
51        T: vld::schema::VldParse,
52    {
53        let value: serde_json::Value = serde_json::from_slice(payload)
54            .map_err(|e| VldLapinError::Deserialization(e.to_string()))?;
55        <T as vld::schema::VldParse>::vld_parse_value(&value).map_err(VldLapinError::Validation)
56    }
57
58    pub async fn queue_declare(
59        &self,
60        queue: &str,
61        options: lapin::options::QueueDeclareOptions,
62        arguments: lapin::types::FieldTable,
63    ) -> Result<lapin::Queue, VldLapinError> {
64        self.inner
65            .queue_declare(queue, options, arguments)
66            .await
67            .map_err(VldLapinError::Lapin)
68    }
69
70    pub async fn basic_get(
71        &self,
72        queue: &str,
73        options: lapin::options::BasicGetOptions,
74    ) -> Result<Option<lapin::message::BasicGetMessage>, VldLapinError> {
75        self.inner
76            .basic_get(queue, options)
77            .await
78            .map_err(VldLapinError::Lapin)
79    }
80
81    pub async fn publish<V>(
82        &self,
83        exchange: &str,
84        routing_key: &str,
85        options: lapin::options::BasicPublishOptions,
86        properties: lapin::BasicProperties,
87        value: &V,
88    ) -> Result<lapin::publisher_confirm::Confirmation, VldLapinError>
89    where
90        V: serde::Serialize + vld::schema::VldParse,
91    {
92        let payload = Self::encode_payload(value)?;
93
94        let confirm = self
95            .inner
96            .basic_publish(exchange, routing_key, options, &payload, properties)
97            .await
98            .map_err(VldLapinError::Lapin)?;
99        confirm.await.map_err(VldLapinError::Lapin)
100    }
101
102    pub fn decode_delivery<T>(
103        &self,
104        delivery: &lapin::message::Delivery,
105    ) -> Result<T, VldLapinError>
106    where
107        T: vld::schema::VldParse,
108    {
109        self.decode_bytes(&delivery.data)
110    }
111
112    pub fn decode_get<T>(
113        &self,
114        message: &lapin::message::BasicGetMessage,
115    ) -> Result<T, VldLapinError>
116    where
117        T: vld::schema::VldParse,
118    {
119        self.decode_delivery(message)
120    }
121
122    pub async fn ack_decode<T>(
123        &self,
124        delivery: &lapin::message::Delivery,
125        options: lapin::options::BasicAckOptions,
126    ) -> Result<T, VldLapinError>
127    where
128        T: vld::schema::VldParse,
129    {
130        let parsed = self.decode_delivery(delivery)?;
131        delivery.ack(options).await.map_err(VldLapinError::Lapin)?;
132        Ok(parsed)
133    }
134
135    pub async fn nack_decode<T>(
136        &self,
137        delivery: &lapin::message::Delivery,
138        options: lapin::options::BasicNackOptions,
139    ) -> Result<T, VldLapinError>
140    where
141        T: vld::schema::VldParse,
142    {
143        let parsed = self.decode_delivery(delivery)?;
144        delivery.nack(options).await.map_err(VldLapinError::Lapin)?;
145        Ok(parsed)
146    }
147
148    pub async fn reject_decode<T>(
149        &self,
150        delivery: &lapin::message::Delivery,
151        options: lapin::options::BasicRejectOptions,
152    ) -> Result<T, VldLapinError>
153    where
154        T: vld::schema::VldParse,
155    {
156        let parsed = self.decode_delivery(delivery)?;
157        delivery
158            .reject(options)
159            .await
160            .map_err(VldLapinError::Lapin)?;
161        Ok(parsed)
162    }
163
164    pub async fn ack_decode_get<T>(
165        &self,
166        message: &lapin::message::BasicGetMessage,
167        options: lapin::options::BasicAckOptions,
168    ) -> Result<T, VldLapinError>
169    where
170        T: vld::schema::VldParse,
171    {
172        self.ack_decode(message, options).await
173    }
174
175    pub async fn nack_decode_get<T>(
176        &self,
177        message: &lapin::message::BasicGetMessage,
178        options: lapin::options::BasicNackOptions,
179    ) -> Result<T, VldLapinError>
180    where
181        T: vld::schema::VldParse,
182    {
183        self.nack_decode(message, options).await
184    }
185
186    pub async fn reject_decode_get<T>(
187        &self,
188        message: &lapin::message::BasicGetMessage,
189        options: lapin::options::BasicRejectOptions,
190    ) -> Result<T, VldLapinError>
191    where
192        T: vld::schema::VldParse,
193    {
194        self.reject_decode(message, options).await
195    }
196}
197
198impl Deref for LapinChannel {
199    type Target = lapin::Channel;
200
201    fn deref(&self) -> &Self::Target {
202        &self.inner
203    }
204}
205
206impl DerefMut for LapinChannel {
207    fn deref_mut(&mut self) -> &mut Self::Target {
208        &mut self.inner
209    }
210}
211
212/// Error type for `vld-lapin`.
213#[derive(Debug)]
214pub enum VldLapinError {
215    Validation(vld::error::VldError),
216    Serialization(String),
217    Deserialization(String),
218    Lapin(lapin::Error),
219}
220
221impl fmt::Display for VldLapinError {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            VldLapinError::Validation(e) => write!(f, "Validation error: {e}"),
225            VldLapinError::Serialization(e) => write!(f, "Serialization error: {e}"),
226            VldLapinError::Deserialization(e) => write!(f, "Deserialization error: {e}"),
227            VldLapinError::Lapin(e) => write!(f, "Lapin error: {e}"),
228        }
229    }
230}
231
232impl std::error::Error for VldLapinError {
233    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
234        match self {
235            VldLapinError::Validation(e) => Some(e),
236            VldLapinError::Lapin(e) => Some(e),
237            VldLapinError::Serialization(_) | VldLapinError::Deserialization(_) => None,
238        }
239    }
240}
241
242impl From<vld::error::VldError> for VldLapinError {
243    fn from(value: vld::error::VldError) -> Self {
244        Self::Validation(value)
245    }
246}
247
248impl From<lapin::Error> for VldLapinError {
249    fn from(value: lapin::Error) -> Self {
250        Self::Lapin(value)
251    }
252}
253
254#[macro_export]
255macro_rules! impl_to_lapin {
256    ($channel:ident) => {
257        let $channel = $crate::LapinChannel::new($channel);
258    };
259}
260
261pub mod prelude {
262    pub use crate::{impl_to_lapin, LapinChannel, VldLapinError};
263    pub use vld::prelude::*;
264}