strut_rabbitmq/transport/outbound/
dispatch.rs

1use crate::util::{
2    Attempt, Morph, Push, PushAppId, PushClusterId, PushContentEncoding, PushContentType,
3    PushCorrelationId, PushExpiration, PushKind, PushMessageId, PushReplyTo, PushUserId,
4    RetrievePushMap,
5};
6use crate::DeliveryMode;
7use lapin::protocol::basic::AMQPProperties;
8use lapin::types::{AMQPValue, FieldTable, ShortString};
9use std::borrow::Cow;
10
11/// Represents an **outgoing** RabbitMQ message.
12///
13/// This dispatch owns only the encoded bytes of the outgoing payload, but not
14/// the payload itself. Furthermore, this dispatch provides no facilities for
15/// encoding the payload.
16///
17/// It is possible to directly [create](crate::Envelope::dispatch_builder) a
18/// [`DispatchBuilder`] from an [`Envelope`].
19#[derive(Debug)]
20pub struct Dispatch {
21    bytes: Vec<u8>,
22    properties: AMQPProperties,
23    routing_key: Option<String>,
24}
25
26impl Dispatch {
27    /// Creates a new [`DispatchBuilder`].
28    pub fn builder() -> DispatchBuilder {
29        DispatchBuilder::new()
30    }
31
32    /// Shorthand for creating a [`Dispatch`] with the payload set to the given
33    /// bytes.
34    ///
35    /// This method is specifically made to take an owned `Vec<u8>`, to make sure
36    /// no copying occurs and the bytes are simply moved into this dispatch.
37    ///
38    /// When copying of bytes is acceptable or desired, use
39    /// [`from_byte_ref`](Dispatch::from_byte_ref).
40    pub fn from_bytes(bytes: Vec<u8>) -> Self {
41        Self::builder().with_bytes(bytes).build()
42    }
43
44    /// Shorthand for creating a [`Dispatch`] by copying the given bytes to the
45    /// payload.
46    pub fn from_byte_ref(bytes: impl AsRef<[u8]>) -> Self {
47        Self::builder().with_byte_ref(bytes).build()
48    }
49
50    /// Creates a new outgoing dispatch with the provided contents.
51    fn new(bytes: Vec<u8>, properties: AMQPProperties, routing_key: Option<String>) -> Self {
52        Self {
53            bytes,
54            properties,
55            routing_key,
56        }
57    }
58}
59
60impl Dispatch {
61    /// Exposes the encoded content of this message.
62    pub fn bytes(&self) -> &[u8] {
63        &self.bytes
64    }
65
66    /// Exposes the properties content of this message.
67    ///
68    /// This getter is for internal use.
69    pub(crate) fn properties(&self) -> AMQPProperties {
70        self.properties.clone()
71    }
72
73    /// Exposes the routing key that is to be used just for this message.
74    pub fn routing_key(&self) -> Option<&str> {
75        self.routing_key.as_deref()
76    }
77}
78
79/// Convenience implementations of [`From`] for [`Dispatch`].
80const _: () = {
81    impl From<String> for Dispatch {
82        fn from(value: String) -> Self {
83            Dispatch::from_bytes(value.into_bytes())
84        }
85    }
86
87    impl From<&str> for Dispatch {
88        fn from(value: &str) -> Self {
89            Dispatch::from_byte_ref(value.as_bytes())
90        }
91    }
92
93    impl From<Vec<u8>> for Dispatch {
94        fn from(value: Vec<u8>) -> Self {
95            Dispatch::from_bytes(value)
96        }
97    }
98
99    impl From<Box<[u8]>> for Dispatch {
100        fn from(value: Box<[u8]>) -> Self {
101            Dispatch::from_bytes(value.into())
102        }
103    }
104
105    impl From<&[u8]> for Dispatch {
106        fn from(value: &[u8]) -> Self {
107            Dispatch::from_byte_ref(value)
108        }
109    }
110
111    impl<'a> From<Cow<'a, str>> for Dispatch {
112        fn from(value: Cow<'a, str>) -> Self {
113            Dispatch::from_bytes(value.into_owned().into_bytes())
114        }
115    }
116
117    impl<'a> From<Cow<'a, [u8]>> for Dispatch {
118        fn from(value: Cow<'a, [u8]>) -> Self {
119            Dispatch::from_bytes(value.into_owned())
120        }
121    }
122};
123
124/// Allows building an **outgoing** RabbitMQ [`Dispatch`] iteratively.
125///
126/// Expects that the payload has been converted to a vector of bytes.
127pub struct DispatchBuilder {
128    bytes: Vec<u8>,
129    properties: AMQPProperties,
130    headers: FieldTable,
131    routing_key: Option<String>,
132}
133
134impl DispatchBuilder {
135    /// Creates a new [`Dispatch`] builder.
136    pub fn new() -> Self {
137        Self {
138            bytes: Vec::new(),
139            properties: AMQPProperties::default(),
140            headers: FieldTable::default(),
141            routing_key: None,
142        }
143    }
144
145    /// Creates a builder from the given vector of bytes and the given parts of
146    /// a destructed [`Envelope`].
147    ///
148    /// This factory is for internal use.
149    pub(crate) fn from_bytes_and_properties(bytes: Vec<u8>, properties: AMQPProperties) -> Self {
150        let headers: FieldTable = properties.headers().clone().unwrap_or_default();
151        Self {
152            bytes,
153            properties,
154            headers,
155            routing_key: None,
156        }
157    }
158
159    /// Sets the payload of this [`Dispatch`] to the given bytes.
160    ///
161    /// This method is specifically made to take an owned `Vec<u8>`, to make sure
162    /// no copying occurs and the bytes are simply moved into this dispatch.
163    ///
164    /// When copying of bytes is acceptable or desired, use
165    /// [`with_byte_ref`](DispatchBuilder::with_byte_ref).
166    pub fn with_bytes(mut self, bytes: Vec<u8>) -> Self {
167        self.bytes = bytes;
168
169        self
170    }
171
172    /// Copies the given bytes to the payload of this [`Dispatch`].
173    pub fn with_byte_ref(mut self, bytes: impl AsRef<[u8]>) -> Self {
174        self.bytes = bytes.as_ref().to_vec();
175
176        self
177    }
178
179    /// Sets the durability flag in the [`AMQPProperties`] of this [`Dispatch`]
180    /// to [“durable”](DeliveryMode::Durable).
181    pub fn durable(mut self) -> Self {
182        self.properties = self.properties.push_delivery_mode(DeliveryMode::Durable);
183
184        self
185    }
186
187    /// Sets the durability flag in the [`AMQPProperties`] of this [`Dispatch`]
188    /// to [“transient”](DeliveryMode::Transient).
189    pub fn transient(mut self) -> Self {
190        self.properties = self.properties.push_delivery_mode(DeliveryMode::Transient);
191
192        self
193    }
194
195    /// Sets the priority in the [`AMQPProperties`] of this [`Dispatch`] to
196    /// the given value.
197    pub fn with_priority(mut self, priority: u8) -> Self {
198        self.properties = self.properties.push_priority(priority);
199
200        self
201    }
202
203    /// Sets the timestamp in the [`AMQPProperties`] of this [`Dispatch`] to
204    /// the given value.
205    pub fn with_timestamp(mut self, timestamp: u64) -> Self {
206        self.properties = self.properties.push_timestamp(timestamp);
207
208        self
209    }
210
211    /// Sets the timestamp in the [`AMQPProperties`] of this [`Dispatch`] to
212    /// the current timestamp.
213    pub fn with_current_timestamp(mut self) -> Self {
214        self.properties = self.properties.push_current_timestamp();
215
216        self
217    }
218
219    /// Sets the attempt header of this [`Dispatch`] to the given value.
220    pub fn with_attempt(mut self, attempt: u32) -> Self {
221        self.headers.push_attempt(attempt);
222
223        self
224    }
225
226    /// Increments the attempt header of this [`Dispatch`] by one. If the attempt
227    /// is not yet present, it will be assumed to be zero, and will be incremented
228    /// to one.
229    pub fn with_incremented_attempt(mut self) -> Self {
230        self.headers.increment_attempt();
231
232        self
233    }
234
235    /// Sets the content type of this [`Dispatch`] to the given value.
236    ///
237    /// ## Example
238    ///
239    /// ```
240    /// use strut_rabbitmq::Dispatch;
241    ///
242    /// let dispatch = Dispatch::builder().with_content_type("application/json").build();
243    /// ```
244    pub fn with_content_type<T>(mut self, content_type: T) -> Self
245    where
246        ShortString: Morph<T>,
247    {
248        self.properties = self.properties.push_content_type(content_type);
249
250        self
251    }
252
253    /// Sets the content encoding of this [`Dispatch`] to the given value.
254    ///
255    /// ## Example
256    ///
257    /// ```
258    /// use strut_rabbitmq::Dispatch;
259    ///
260    /// let dispatch = Dispatch::builder().with_content_encoding("gzip").build();
261    /// ```
262    pub fn with_content_encoding<T>(mut self, content_encoding: T) -> Self
263    where
264        ShortString: Morph<T>,
265    {
266        self.properties = self.properties.push_content_encoding(content_encoding);
267
268        self
269    }
270
271    /// Sets a header of this [`Dispatch`] under the given `key` to the given
272    /// `value`.
273    ///
274    /// ## Example
275    ///
276    /// ```
277    /// use strut_rabbitmq::Dispatch;
278    ///
279    /// let dispatch = Dispatch::builder()
280    ///     .with_header("created_via", "strut")
281    ///     .with_header("processed_in_secs", 12.9)
282    ///     .build();
283    /// ```
284    pub fn with_header<T>(mut self, key: &str, value: T) -> Self
285    where
286        AMQPValue: Morph<T>,
287    {
288        self.headers.push(key, value);
289
290        self
291    }
292
293    /// Sets the correlation ID of this [`Dispatch`] to the given value.
294    ///
295    /// ## Example
296    ///
297    /// ```
298    /// use strut_rabbitmq::Dispatch;
299    ///
300    /// let dispatch = Dispatch::builder().with_correlation_id(4489).build();
301    /// ```
302    pub fn with_correlation_id<T>(mut self, correlation_id: T) -> Self
303    where
304        ShortString: Morph<T>,
305    {
306        self.properties = self.properties.push_correlation_id(correlation_id);
307
308        self
309    }
310
311    /// Sets the “reply-to” value of this [`Dispatch`] to the given value.
312    ///
313    /// ## Example
314    ///
315    /// ```
316    /// use strut_rabbitmq::Dispatch;
317    ///
318    /// let dispatch = Dispatch::builder().with_reply_to("me").build();
319    /// ```
320    pub fn with_reply_to<T>(mut self, reply_to: T) -> Self
321    where
322        ShortString: Morph<T>,
323    {
324        self.properties = self.properties.push_reply_to(reply_to);
325
326        self
327    }
328
329    /// Sets the expiration of this [`Dispatch`] to the given value.
330    ///
331    /// ## Example
332    ///
333    /// ```
334    /// use strut_rabbitmq::Dispatch;
335    ///
336    /// let dispatch = Dispatch::builder().with_expiration(86400).build();
337    /// ```
338    pub fn with_expiration<T>(mut self, expiration: T) -> Self
339    where
340        ShortString: Morph<T>,
341    {
342        self.properties = self.properties.push_expiration(expiration);
343
344        self
345    }
346
347    /// Sets the message ID of this [`Dispatch`] to the given value.
348    ///
349    /// ## Example
350    ///
351    /// ```
352    /// use strut_rabbitmq::Dispatch;
353    ///
354    /// let dispatch = Dispatch::builder().with_message_id(664833405).build();
355    /// ```
356    pub fn with_message_id<T>(mut self, message_id: T) -> Self
357    where
358        ShortString: Morph<T>,
359    {
360        self.properties = self.properties.push_message_id(message_id);
361
362        self
363    }
364
365    /// Sets the kind of this [`Dispatch`] to the given value.
366    ///
367    /// ## Example
368    ///
369    /// ```
370    /// use strut_rabbitmq::Dispatch;
371    ///
372    /// let dispatch = Dispatch::builder().with_kind("priority").build();
373    /// ```
374    pub fn with_kind<T>(mut self, kind: T) -> Self
375    where
376        ShortString: Morph<T>,
377    {
378        self.properties = self.properties.push_kind(kind);
379
380        self
381    }
382
383    /// Sets the user ID of this [`Dispatch`] to the given value.
384    ///
385    /// ## Example
386    ///
387    /// ```
388    /// use strut_rabbitmq::Dispatch;
389    ///
390    /// let dispatch = Dispatch::builder()
391    ///     .with_user_id("01522090-f465-4a44-bd3d-9e06c061f6ac")
392    ///     .build();
393    /// ```
394    pub fn with_user_id<T>(mut self, user_id: T) -> Self
395    where
396        ShortString: Morph<T>,
397    {
398        self.properties = self.properties.push_user_id(user_id);
399
400        self
401    }
402
403    /// Sets the app ID of this [`Dispatch`] to the given value.
404    ///
405    /// ## Example
406    ///
407    /// ```
408    /// use strut_rabbitmq::Dispatch;
409    ///
410    /// let dispatch = Dispatch::builder().with_app_id("app_17").build();
411    /// ```
412    pub fn with_app_id<T>(mut self, app_id: T) -> Self
413    where
414        ShortString: Morph<T>,
415    {
416        self.properties = self.properties.push_app_id(app_id);
417
418        self
419    }
420
421    /// Sets the cluster ID of this [`Dispatch`] to the given value.
422    ///
423    /// ## Example
424    ///
425    /// ```
426    /// use strut_rabbitmq::Dispatch;
427    ///
428    /// let dispatch = Dispatch::builder().with_cluster_id(7747).build();
429    /// ```
430    pub fn with_cluster_id<T>(mut self, cluster_id: T) -> Self
431    where
432        ShortString: Morph<T>,
433    {
434        self.properties = self.properties.push_cluster_id(cluster_id);
435
436        self
437    }
438
439    /// Defines a routing key to be used just for this dispatch.
440    ///
441    /// If this method is never called, the [`Publisher`](crate::Publisher)
442    /// instead uses the routing key [configured](crate::Egress::routing_key) on
443    /// the [`Egress`](crate::Egress).
444    pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
445        self.routing_key = Some(routing_key.into());
446
447        self
448    }
449
450    /// Builds the [`Dispatch`].
451    pub fn build(self) -> Dispatch {
452        // Destructure self
453        let DispatchBuilder {
454            bytes,
455            mut properties,
456            headers,
457            ..
458        } = self;
459
460        // Put the headers into the properties
461        properties = properties.with_headers(headers);
462
463        Dispatch::new(bytes, properties, self.routing_key)
464    }
465}