strut_rabbitmq/transport/inbound/envelope.rs
1use crate::transport::inbound::delivery::{abandon_delivery, backwash_delivery, complete_delivery};
2use crate::util::{
3 Coerce, RetrieveAppId, RetrieveClusterId, RetrieveContentEncoding, RetrieveContentType,
4 RetrieveCorrelationId, RetrieveExpiration, RetrieveHeader, RetrieveKind, RetrieveMessageId,
5 RetrievePushMap, RetrieveReplyTo, RetrieveUserId,
6};
7use crate::{Decoder, DeliveryMode, DispatchBuilder};
8use lapin::acker::Acker;
9use lapin::message::Delivery;
10use lapin::protocol::basic::AMQPProperties;
11use lapin::types::{AMQPValue, ShortString};
12use parking_lot::Mutex as SyncMutex;
13use std::sync::Arc;
14use tracing::error;
15
16pub mod stack;
17
18/// Represents an **incoming** RabbitMQ message.
19///
20/// This class owns both the bytes of the original message’s payload and the
21/// decoded (e.g., deserialized) payload `T`, along with the implementation
22/// details of the original [`Delivery`].
23#[derive(Debug)]
24pub struct Envelope<T> {
25 /// The name of the subscriber that received this message.
26 subscriber: Arc<str>,
27 /// The original delivery tag.
28 delivery_tag: u64,
29 /// The original target exchange used to send the message.
30 exchange: ShortString,
31 /// The original routing key used to send the message.
32 routing_key: ShortString,
33 /// The original redelivery flag.
34 is_redelivered: bool,
35 /// The original properties.
36 properties: AMQPProperties,
37 /// The original bytes.
38 bytes: Vec<u8>,
39 /// The acker associated with this message (optional for cases where messages
40 /// are acked [automatically](crate::AckingBehavior::Auto)).
41 acker: SyncMutex<Option<Acker>>,
42 /// The decoded content of the underlying message, stored alongside its
43 /// original bytes.
44 payload: T,
45}
46
47/// Represents a failed attempt to create an [`Envelope`] from a [`Decoder`] and
48/// a [`Delivery`].
49pub(crate) struct DecoderError<D>
50where
51 D: Decoder,
52{
53 /// The original bytes that were not decoded.
54 pub(crate) bytes: Vec<u8>,
55 /// The acker of the original message.
56 pub(crate) acker: Option<Acker>,
57 /// The decoder error.
58 pub(crate) error: D::Error,
59}
60
61impl<T> Envelope<T> {
62 /// Attempts to create an envelope from the given [`Delivery`] using the
63 /// provided [`Decoder`] implementation for interpreting the message payload.
64 ///
65 /// The given `is_pending` flag indicates whether this message is not yet
66 /// finalized and thus, whether the [`Acker`] should be extracted and used.
67 pub(crate) fn try_from<D>(
68 subscriber: Arc<str>,
69 decoder: &D,
70 delivery: Delivery,
71 is_pending: bool,
72 ) -> Result<Envelope<T>, DecoderError<D>>
73 where
74 D: Decoder<Result = T>,
75 {
76 // Destructure inputs
77 let Delivery {
78 delivery_tag,
79 exchange,
80 routing_key,
81 redelivered: is_redelivered,
82 properties,
83 data: bytes,
84 acker,
85 } = delivery;
86 let acker = is_pending.then_some(acker);
87
88 // Attempt to decode the given bytes with the given decoder
89 match decoder.decode(&bytes) {
90 // Successfully decoded
91 Ok(payload) => Ok(Self {
92 subscriber,
93 delivery_tag,
94 exchange,
95 routing_key,
96 is_redelivered,
97 bytes,
98 properties,
99 acker: SyncMutex::new(acker),
100 payload,
101 }),
102
103 // Failed to decode
104 Err(error) => Err(DecoderError {
105 bytes,
106 acker,
107 error,
108 }),
109 }
110 }
111}
112
113impl<T> Envelope<T> {
114 /// Exposes the delivery tag of the underlying incoming message.
115 pub fn delivery_tag(&self) -> u64 {
116 self.delivery_tag
117 }
118
119 /// Exposes the original target exchange used to send the underlying
120 /// incoming message.
121 pub fn exchange(&self) -> &str {
122 self.exchange.as_str()
123 }
124
125 /// Exposes the original routing key used to send the underlying incoming
126 /// message.
127 pub fn routing_key(&self) -> &str {
128 self.routing_key.as_str()
129 }
130
131 /// Exposes the original redelivery flag of the underlying incoming message.
132 ///
133 /// A message is redelivered when it has been previously dropped without
134 /// [finalizing](FinalizationKind), or if it was previously explicitly
135 /// [backwashed](FinalizationKind::Backwash).
136 pub fn is_redelivered(&self) -> bool {
137 self.is_redelivered
138 }
139
140 /// Exposes the original bytes of this message.
141 pub fn bytes(&self) -> &[u8] {
142 &self.bytes
143 }
144
145 /// Exposes the decoded payload of this message.
146 ///
147 /// Most [decoders](Decoder) derive the payload from the original
148 /// [bytes](Envelope::bytes) of this message. A notable exceptions is the
149 /// [`NoopDecoder`](crate::NoopDecoder).
150 pub fn payload(&self) -> &T {
151 &self.payload
152 }
153
154 /// Exposes the attempt number of the underlying incoming message, if present.
155 pub fn attempt(&self) -> Option<u32> {
156 self.properties.retrieve_attempt()
157 }
158
159 /// Exposes the delivery mode of the underlying incoming message, if present.
160 pub fn delivery_mode(&self) -> Option<DeliveryMode> {
161 self.properties.retrieve_delivery_mode()
162 }
163
164 /// Exposes the priority of the underlying incoming message, if present.
165 pub fn priority(&self) -> Option<u8> {
166 self.properties.retrieve_priority()
167 }
168
169 /// Exposes the timestamp of the underlying incoming message, if present.
170 pub fn timestamp(&self) -> Option<u64> {
171 self.properties.retrieve_timestamp()
172 }
173}
174
175impl<T> Envelope<T> {
176 /// Reports the content type of this message, if present, [coercing](Coerce)
177 /// it into type `R`, if supported.
178 pub fn content_type<'a, R>(&'a self) -> Option<R>
179 where
180 ShortString: Coerce<'a, R>,
181 {
182 self.properties.retrieve_content_type()
183 }
184
185 /// Reports the content encoding of this message, if present,
186 /// [coercing](Coerce) it into type `R`, if supported.
187 pub fn content_encoding<'a, R>(&'a self) -> Option<R>
188 where
189 ShortString: Coerce<'a, R>,
190 {
191 self.properties.retrieve_content_encoding()
192 }
193
194 /// Reports the header value from this message by key, if present,
195 /// [coercing](Coerce) it into type `R`, if supported.
196 pub fn header<'a, R>(&'a self, key: &str) -> Option<R>
197 where
198 AMQPValue: Coerce<'a, R>,
199 {
200 self.properties.retrieve_header(key)
201 }
202
203 /// Reports the correlation ID of this message, if present,
204 /// [coercing](Coerce) it into type `R`, if supported.
205 pub fn correlation_id<'a, R>(&'a self) -> Option<R>
206 where
207 ShortString: Coerce<'a, R>,
208 {
209 self.properties.retrieve_correlation_id()
210 }
211
212 /// Reports the “reply-to” value of this message, if present,
213 /// [coercing](Coerce) it into type `R`, if supported.
214 pub fn reply_to<'a, R>(&'a self) -> Option<R>
215 where
216 ShortString: Coerce<'a, R>,
217 {
218 self.properties.retrieve_reply_to()
219 }
220
221 /// Reports the expiration of this message, if present, [coercing](Coerce)
222 /// it into type `R`, if supported.
223 pub fn expiration<'a, R>(&'a self) -> Option<R>
224 where
225 ShortString: Coerce<'a, R>,
226 {
227 self.properties.retrieve_expiration()
228 }
229
230 /// Reports the ID of this message, if present, [coercing](Coerce) it into
231 /// type `R`, if supported.
232 pub fn message_id<'a, R>(&'a self) -> Option<R>
233 where
234 ShortString: Coerce<'a, R>,
235 {
236 self.properties.retrieve_message_id()
237 }
238
239 /// Reports the kind of this message, if present, [coercing](Coerce) it into
240 /// type `R`, if supported.
241 pub fn kind<'a, R>(&'a self) -> Option<R>
242 where
243 ShortString: Coerce<'a, R>,
244 {
245 self.properties.retrieve_kind()
246 }
247
248 /// Reports the user ID of this message, if present, [coercing](Coerce) it
249 /// into type `R`, if supported.
250 pub fn user_id<'a, R>(&'a self) -> Option<R>
251 where
252 ShortString: Coerce<'a, R>,
253 {
254 self.properties.retrieve_user_id()
255 }
256
257 /// Reports the app ID of this message, if present, [coercing](Coerce) it
258 /// into type `R`, if supported.
259 pub fn app_id<'a, R>(&'a self) -> Option<R>
260 where
261 ShortString: Coerce<'a, R>,
262 {
263 self.properties.retrieve_app_id()
264 }
265
266 /// Reports the cluster ID of this message, if present, [coercing](Coerce)
267 /// it into type `R`, if supported.
268 pub fn cluster_id<'a, R>(&'a self) -> Option<R>
269 where
270 ShortString: Coerce<'a, R>,
271 {
272 self.properties.retrieve_cluster_id()
273 }
274}
275
276impl<T> Envelope<T> {
277 /// Copies the bytes of this envelope into a new [`DispatchBuilder`], which
278 /// will represent a message almost identical to the one that created this
279 /// initial envelope.
280 ///
281 /// The intention of this method is to enable sending an incoming envelope
282 /// back to RabbitMQ, unchanged, e.g., for the purpose of retrying the
283 /// processing of the message.
284 pub fn dispatch_builder(&self) -> DispatchBuilder {
285 // Clone main building blocks
286 let bytes = self.bytes.clone();
287 let properties = self.properties.clone();
288
289 DispatchBuilder::from_bytes_and_properties(bytes, properties)
290 }
291
292 /// [Completes](FinalizationKind::Complete) the incoming message represented
293 /// by this [`Envelope`]. If this envelope has already been
294 /// [finalized](FinalizationKind) (either
295 /// [automatically](crate::AckingBehavior::Auto), or manually via a method
296 /// like this one), this method is a no-op.
297 pub async fn complete(self) {
298 let optional_acker = self.acker.lock().take();
299 if let Some(ref acker) = optional_acker {
300 complete_delivery(self.subscriber.as_ref(), acker, &self.bytes).await;
301 }
302 }
303
304 /// [Backwashes](FinalizationKind::Backwash) the incoming message
305 /// represented by this [`Envelope`]. If this envelope has already been
306 /// [finalized](FinalizationKind) (either
307 /// [automatically](crate::AckingBehavior::Auto), or manually via a method
308 /// like this one), this method is a no-op.
309 pub async fn backwash(self) {
310 let optional_acker = self.acker.lock().take();
311 if let Some(ref acker) = optional_acker {
312 backwash_delivery(self.subscriber.as_ref(), acker, &self.bytes).await;
313 }
314 }
315
316 /// [Abandons](FinalizationKind::Abandon) the incoming message represented
317 /// by this [`Envelope`]. If this envelope has already been
318 /// [finalized](FinalizationKind) (either
319 /// [automatically](crate::AckingBehavior::Auto), or manually via a method
320 /// like this one), this method is a no-op.
321 pub async fn abandon(self) {
322 let optional_acker = self.acker.lock().take();
323 if let Some(ref acker) = optional_acker {
324 abandon_delivery(self.subscriber.as_ref(), acker, &self.bytes).await;
325 }
326 }
327}
328
329impl<T> Envelope<T>
330where
331 T: Default,
332{
333 /// Consumes this envelope and returns its building blocks for manual
334 /// handling. This allows to effectively opt out of this crate’s handling
335 /// mechanisms.
336 ///
337 /// This function is being considered for the public API. The signature of
338 /// this method is intentionally clunky, as it is not intended for widespread
339 /// use.
340 ///
341 /// Because [`Envelope`] implements [`Drop`], it cannot be simply destructured.
342 /// We have to [`take`](std::mem::take) every element out of it, and this means
343 /// that every element (including `T`) must implement [`Default`]. The caller
344 /// must anticipate that a new, default instance of `T` will be constructed
345 /// in this method call.
346 #[allow(dead_code)]
347 pub(crate) fn destruct(mut self) -> (Vec<u8>, AMQPProperties, Option<Acker>, T) {
348 // Pick `self` apart
349 let bytes = std::mem::take(&mut self.bytes);
350 let properties = std::mem::take(&mut self.properties);
351 let acker = self.acker.lock().take();
352 let content = std::mem::take(&mut self.payload);
353
354 // Return the parts
355 (bytes, properties, acker, content)
356 }
357}
358
359#[cfg(test)]
360impl Envelope<()> {
361 /// Creates a new instance with given [`AMQPProperties`].
362 pub fn test_dud(properties: AMQPProperties) -> Self {
363 use crate::util::Morph;
364
365 Self {
366 subscriber: Arc::from("test"),
367 delivery_tag: 0,
368 exchange: ShortString::morph(""),
369 routing_key: ShortString::morph(""),
370 is_redelivered: false,
371 properties,
372 bytes: vec![],
373 acker: SyncMutex::new(None),
374 payload: (),
375 }
376 }
377}
378
379impl<T> Drop for Envelope<T> {
380 fn drop(&mut self) {
381 if self.acker.lock().is_some() {
382 error!(
383 alert = true,
384 byte_preview = String::from_utf8_lossy(&self.bytes).as_ref(),
385 "Dropped an envelope without finalizing it",
386 );
387 }
388 }
389}