strut_rabbitmq/transport/inbound/
envelope.rs

1use crate::transport::inbound::delivery::{abandon_delivery, backwash_delivery, complete_delivery};
2use crate::util::{
3    Coerce, RetrieveAppId, RetrieveClusterId, RetrieveContentEncoding, RetrieveContentType,
4    RetrieveCorrelationId, RetrieveExpiration, RetrieveHeader, RetrieveKind, RetrieveMessageId,
5    RetrievePushMap, RetrieveReplyTo, RetrieveUserId,
6};
7use crate::{Decoder, DeliveryMode, DispatchBuilder};
8use lapin::acker::Acker;
9use lapin::message::Delivery;
10use lapin::protocol::basic::AMQPProperties;
11use lapin::types::{AMQPValue, ShortString};
12use parking_lot::Mutex as SyncMutex;
13use std::sync::Arc;
14use tracing::error;
15
16pub mod stack;
17
18/// Represents an **incoming** RabbitMQ message.
19///
20/// This class owns both the bytes of the original message’s payload and the
21/// decoded (e.g., deserialized) payload `T`, along with the implementation
22/// details of the original [`Delivery`].
23#[derive(Debug)]
24pub struct Envelope<T> {
25    /// The name of the subscriber that received this message.
26    subscriber: Arc<str>,
27    /// The original delivery tag.
28    delivery_tag: u64,
29    /// The original target exchange used to send the message.
30    exchange: ShortString,
31    /// The original routing key used to send the message.
32    routing_key: ShortString,
33    /// The original redelivery flag.
34    is_redelivered: bool,
35    /// The original properties.
36    properties: AMQPProperties,
37    /// The original bytes.
38    bytes: Vec<u8>,
39    /// The acker associated with this message (optional for cases where messages
40    /// are acked [automatically](crate::AckingBehavior::Auto)).
41    acker: SyncMutex<Option<Acker>>,
42    /// The decoded content of the underlying message, stored alongside its
43    /// original bytes.
44    payload: T,
45}
46
47/// Represents a failed attempt to create an [`Envelope`] from a [`Decoder`] and
48/// a [`Delivery`].
49pub(crate) struct DecoderError<D>
50where
51    D: Decoder,
52{
53    /// The original bytes that were not decoded.
54    pub(crate) bytes: Vec<u8>,
55    /// The acker of the original message.
56    pub(crate) acker: Option<Acker>,
57    /// The decoder error.
58    pub(crate) error: D::Error,
59}
60
61impl<T> Envelope<T> {
62    /// Attempts to create an envelope from the given [`Delivery`] using the
63    /// provided [`Decoder`] implementation for interpreting the message payload.
64    ///
65    /// The given `is_pending` flag indicates whether this message is not yet
66    /// finalized and thus, whether the [`Acker`] should be extracted and used.
67    pub(crate) fn try_from<D>(
68        subscriber: Arc<str>,
69        decoder: &D,
70        delivery: Delivery,
71        is_pending: bool,
72    ) -> Result<Envelope<T>, DecoderError<D>>
73    where
74        D: Decoder<Result = T>,
75    {
76        // Destructure inputs
77        let Delivery {
78            delivery_tag,
79            exchange,
80            routing_key,
81            redelivered: is_redelivered,
82            properties,
83            data: bytes,
84            acker,
85        } = delivery;
86        let acker = is_pending.then_some(acker);
87
88        // Attempt to decode the given bytes with the given decoder
89        match decoder.decode(&bytes) {
90            // Successfully decoded
91            Ok(payload) => Ok(Self {
92                subscriber,
93                delivery_tag,
94                exchange,
95                routing_key,
96                is_redelivered,
97                bytes,
98                properties,
99                acker: SyncMutex::new(acker),
100                payload,
101            }),
102
103            // Failed to decode
104            Err(error) => Err(DecoderError {
105                bytes,
106                acker,
107                error,
108            }),
109        }
110    }
111}
112
113impl<T> Envelope<T> {
114    /// Exposes the delivery tag of the underlying incoming message.
115    pub fn delivery_tag(&self) -> u64 {
116        self.delivery_tag
117    }
118
119    /// Exposes the original target exchange used to send the underlying
120    /// incoming message.
121    pub fn exchange(&self) -> &str {
122        self.exchange.as_str()
123    }
124
125    /// Exposes the original routing key used to send the underlying incoming
126    /// message.
127    pub fn routing_key(&self) -> &str {
128        self.routing_key.as_str()
129    }
130
131    /// Exposes the original redelivery flag of the underlying incoming message.
132    ///
133    /// A message is redelivered when it has been previously dropped without
134    /// [finalizing](FinalizationKind), or if it was previously explicitly
135    /// [backwashed](FinalizationKind::Backwash).
136    pub fn is_redelivered(&self) -> bool {
137        self.is_redelivered
138    }
139
140    /// Exposes the original bytes of this message.
141    pub fn bytes(&self) -> &[u8] {
142        &self.bytes
143    }
144
145    /// Exposes the decoded payload of this message.
146    ///
147    /// Most [decoders](Decoder) derive the payload from the original
148    /// [bytes](Envelope::bytes) of this message. A notable exceptions is the
149    /// [`NoopDecoder`](crate::NoopDecoder).
150    pub fn payload(&self) -> &T {
151        &self.payload
152    }
153
154    /// Exposes the attempt number of the underlying incoming message, if present.
155    pub fn attempt(&self) -> Option<u32> {
156        self.properties.retrieve_attempt()
157    }
158
159    /// Exposes the delivery mode of the underlying incoming message, if present.
160    pub fn delivery_mode(&self) -> Option<DeliveryMode> {
161        self.properties.retrieve_delivery_mode()
162    }
163
164    /// Exposes the priority of the underlying incoming message, if present.
165    pub fn priority(&self) -> Option<u8> {
166        self.properties.retrieve_priority()
167    }
168
169    /// Exposes the timestamp of the underlying incoming message, if present.
170    pub fn timestamp(&self) -> Option<u64> {
171        self.properties.retrieve_timestamp()
172    }
173}
174
175impl<T> Envelope<T> {
176    /// Reports the content type of this message, if present, [coercing](Coerce)
177    /// it into type `R`, if supported.
178    pub fn content_type<'a, R>(&'a self) -> Option<R>
179    where
180        ShortString: Coerce<'a, R>,
181    {
182        self.properties.retrieve_content_type()
183    }
184
185    /// Reports the content encoding of this message, if present,
186    /// [coercing](Coerce) it into type `R`, if supported.
187    pub fn content_encoding<'a, R>(&'a self) -> Option<R>
188    where
189        ShortString: Coerce<'a, R>,
190    {
191        self.properties.retrieve_content_encoding()
192    }
193
194    /// Reports the header value from this message by key, if present,
195    /// [coercing](Coerce) it into type `R`, if supported.
196    pub fn header<'a, R>(&'a self, key: &str) -> Option<R>
197    where
198        AMQPValue: Coerce<'a, R>,
199    {
200        self.properties.retrieve_header(key)
201    }
202
203    /// Reports the correlation ID of this message, if present,
204    /// [coercing](Coerce) it into type `R`, if supported.
205    pub fn correlation_id<'a, R>(&'a self) -> Option<R>
206    where
207        ShortString: Coerce<'a, R>,
208    {
209        self.properties.retrieve_correlation_id()
210    }
211
212    /// Reports the “reply-to” value of this message, if present,
213    /// [coercing](Coerce) it into type `R`, if supported.
214    pub fn reply_to<'a, R>(&'a self) -> Option<R>
215    where
216        ShortString: Coerce<'a, R>,
217    {
218        self.properties.retrieve_reply_to()
219    }
220
221    /// Reports the expiration of this message, if present, [coercing](Coerce)
222    /// it into type `R`, if supported.
223    pub fn expiration<'a, R>(&'a self) -> Option<R>
224    where
225        ShortString: Coerce<'a, R>,
226    {
227        self.properties.retrieve_expiration()
228    }
229
230    /// Reports the ID of this message, if present, [coercing](Coerce) it into
231    /// type `R`, if supported.
232    pub fn message_id<'a, R>(&'a self) -> Option<R>
233    where
234        ShortString: Coerce<'a, R>,
235    {
236        self.properties.retrieve_message_id()
237    }
238
239    /// Reports the kind of this message, if present, [coercing](Coerce) it into
240    /// type `R`, if supported.
241    pub fn kind<'a, R>(&'a self) -> Option<R>
242    where
243        ShortString: Coerce<'a, R>,
244    {
245        self.properties.retrieve_kind()
246    }
247
248    /// Reports the user ID of this message, if present, [coercing](Coerce) it
249    /// into type `R`, if supported.
250    pub fn user_id<'a, R>(&'a self) -> Option<R>
251    where
252        ShortString: Coerce<'a, R>,
253    {
254        self.properties.retrieve_user_id()
255    }
256
257    /// Reports the app ID of this message, if present, [coercing](Coerce) it
258    /// into type `R`, if supported.
259    pub fn app_id<'a, R>(&'a self) -> Option<R>
260    where
261        ShortString: Coerce<'a, R>,
262    {
263        self.properties.retrieve_app_id()
264    }
265
266    /// Reports the cluster ID of this message, if present, [coercing](Coerce)
267    /// it into type `R`, if supported.
268    pub fn cluster_id<'a, R>(&'a self) -> Option<R>
269    where
270        ShortString: Coerce<'a, R>,
271    {
272        self.properties.retrieve_cluster_id()
273    }
274}
275
276impl<T> Envelope<T> {
277    /// Copies the bytes of this envelope into a new [`DispatchBuilder`], which
278    /// will represent a message almost identical to the one that created this
279    /// initial envelope.
280    ///
281    /// The intention of this method is to enable sending an incoming envelope
282    /// back to RabbitMQ, unchanged, e.g., for the purpose of retrying the
283    /// processing of the message.
284    pub fn dispatch_builder(&self) -> DispatchBuilder {
285        // Clone main building blocks
286        let bytes = self.bytes.clone();
287        let properties = self.properties.clone();
288
289        DispatchBuilder::from_bytes_and_properties(bytes, properties)
290    }
291
292    /// [Completes](FinalizationKind::Complete) the incoming message represented
293    /// by this [`Envelope`]. If this envelope has already been
294    /// [finalized](FinalizationKind) (either
295    /// [automatically](crate::AckingBehavior::Auto), or manually via a method
296    /// like this one), this method is a no-op.
297    pub async fn complete(self) {
298        let optional_acker = self.acker.lock().take();
299        if let Some(ref acker) = optional_acker {
300            complete_delivery(self.subscriber.as_ref(), acker, &self.bytes).await;
301        }
302    }
303
304    /// [Backwashes](FinalizationKind::Backwash) the incoming message
305    /// represented by this [`Envelope`]. If this envelope has already been
306    /// [finalized](FinalizationKind) (either
307    /// [automatically](crate::AckingBehavior::Auto), or manually via a method
308    /// like this one), this method is a no-op.
309    pub async fn backwash(self) {
310        let optional_acker = self.acker.lock().take();
311        if let Some(ref acker) = optional_acker {
312            backwash_delivery(self.subscriber.as_ref(), acker, &self.bytes).await;
313        }
314    }
315
316    /// [Abandons](FinalizationKind::Abandon) the incoming message represented
317    /// by this [`Envelope`]. If this envelope has already been
318    /// [finalized](FinalizationKind) (either
319    /// [automatically](crate::AckingBehavior::Auto), or manually via a method
320    /// like this one), this method is a no-op.
321    pub async fn abandon(self) {
322        let optional_acker = self.acker.lock().take();
323        if let Some(ref acker) = optional_acker {
324            abandon_delivery(self.subscriber.as_ref(), acker, &self.bytes).await;
325        }
326    }
327}
328
329impl<T> Envelope<T>
330where
331    T: Default,
332{
333    /// Consumes this envelope and returns its building blocks for manual
334    /// handling. This allows to effectively opt out of this crate’s handling
335    /// mechanisms.
336    ///
337    /// This function is being considered for the public API. The signature of
338    /// this method is intentionally clunky, as it is not intended for widespread
339    /// use.
340    ///
341    /// Because [`Envelope`] implements [`Drop`], it cannot be simply destructured.
342    /// We have to [`take`](std::mem::take) every element out of it, and this means
343    /// that every element (including `T`) must implement [`Default`]. The caller
344    /// must anticipate that a new, default instance of `T` will be constructed
345    /// in this method call.
346    #[allow(dead_code)]
347    pub(crate) fn destruct(mut self) -> (Vec<u8>, AMQPProperties, Option<Acker>, T) {
348        // Pick `self` apart
349        let bytes = std::mem::take(&mut self.bytes);
350        let properties = std::mem::take(&mut self.properties);
351        let acker = self.acker.lock().take();
352        let content = std::mem::take(&mut self.payload);
353
354        // Return the parts
355        (bytes, properties, acker, content)
356    }
357}
358
359#[cfg(test)]
360impl Envelope<()> {
361    /// Creates a new instance with given [`AMQPProperties`].
362    pub fn test_dud(properties: AMQPProperties) -> Self {
363        use crate::util::Morph;
364
365        Self {
366            subscriber: Arc::from("test"),
367            delivery_tag: 0,
368            exchange: ShortString::morph(""),
369            routing_key: ShortString::morph(""),
370            is_redelivered: false,
371            properties,
372            bytes: vec![],
373            acker: SyncMutex::new(None),
374            payload: (),
375        }
376    }
377}
378
379impl<T> Drop for Envelope<T> {
380    fn drop(&mut self) {
381        if self.acker.lock().is_some() {
382            error!(
383                alert = true,
384                byte_preview = String::from_utf8_lossy(&self.bytes).as_ref(),
385                "Dropped an envelope without finalizing it",
386            );
387        }
388    }
389}