strut_rabbitmq/transport/outbound/dispatch.rs
1use crate::util::{
2 Attempt, Morph, Push, PushAppId, PushClusterId, PushContentEncoding, PushContentType,
3 PushCorrelationId, PushExpiration, PushKind, PushMessageId, PushReplyTo, PushUserId,
4 RetrievePushMap,
5};
6use crate::DeliveryMode;
7use lapin::protocol::basic::AMQPProperties;
8use lapin::types::{AMQPValue, FieldTable, ShortString};
9use std::borrow::Cow;
10
11/// Represents an **outgoing** RabbitMQ message.
12///
13/// This dispatch owns only the encoded bytes of the outgoing payload, but not
14/// the payload itself. Furthermore, this dispatch provides no facilities for
15/// encoding the payload.
16///
17/// It is possible to directly [create](crate::Envelope::dispatch_builder) a
18/// [`DispatchBuilder`] from an [`Envelope`].
19#[derive(Debug)]
20pub struct Dispatch {
21 bytes: Vec<u8>,
22 properties: AMQPProperties,
23 routing_key: Option<String>,
24}
25
26impl Dispatch {
27 /// Creates a new [`DispatchBuilder`].
28 pub fn builder() -> DispatchBuilder {
29 DispatchBuilder::new()
30 }
31
32 /// Shorthand for creating a [`Dispatch`] with the payload set to the given
33 /// bytes.
34 ///
35 /// This method is specifically made to take an owned `Vec<u8>`, to make sure
36 /// no copying occurs and the bytes are simply moved into this dispatch.
37 ///
38 /// When copying of bytes is acceptable or desired, use
39 /// [`from_byte_ref`](Dispatch::from_byte_ref).
40 pub fn from_bytes(bytes: Vec<u8>) -> Self {
41 Self::builder().with_bytes(bytes).build()
42 }
43
44 /// Shorthand for creating a [`Dispatch`] by copying the given bytes to the
45 /// payload.
46 pub fn from_byte_ref(bytes: impl AsRef<[u8]>) -> Self {
47 Self::builder().with_byte_ref(bytes).build()
48 }
49
50 /// Creates a new outgoing dispatch with the provided contents.
51 fn new(bytes: Vec<u8>, properties: AMQPProperties, routing_key: Option<String>) -> Self {
52 Self {
53 bytes,
54 properties,
55 routing_key,
56 }
57 }
58}
59
60impl Dispatch {
61 /// Exposes the encoded content of this message.
62 pub fn bytes(&self) -> &[u8] {
63 &self.bytes
64 }
65
66 /// Exposes the properties content of this message.
67 ///
68 /// This getter is for internal use.
69 pub(crate) fn properties(&self) -> AMQPProperties {
70 self.properties.clone()
71 }
72
73 /// Exposes the routing key that is to be used just for this message.
74 pub fn routing_key(&self) -> Option<&str> {
75 self.routing_key.as_deref()
76 }
77}
78
79/// Convenience implementations of [`From`] for [`Dispatch`].
80const _: () = {
81 impl From<String> for Dispatch {
82 fn from(value: String) -> Self {
83 Dispatch::from_bytes(value.into_bytes())
84 }
85 }
86
87 impl From<&str> for Dispatch {
88 fn from(value: &str) -> Self {
89 Dispatch::from_byte_ref(value.as_bytes())
90 }
91 }
92
93 impl From<Vec<u8>> for Dispatch {
94 fn from(value: Vec<u8>) -> Self {
95 Dispatch::from_bytes(value)
96 }
97 }
98
99 impl From<Box<[u8]>> for Dispatch {
100 fn from(value: Box<[u8]>) -> Self {
101 Dispatch::from_bytes(value.into())
102 }
103 }
104
105 impl From<&[u8]> for Dispatch {
106 fn from(value: &[u8]) -> Self {
107 Dispatch::from_byte_ref(value)
108 }
109 }
110
111 impl<'a> From<Cow<'a, str>> for Dispatch {
112 fn from(value: Cow<'a, str>) -> Self {
113 Dispatch::from_bytes(value.into_owned().into_bytes())
114 }
115 }
116
117 impl<'a> From<Cow<'a, [u8]>> for Dispatch {
118 fn from(value: Cow<'a, [u8]>) -> Self {
119 Dispatch::from_bytes(value.into_owned())
120 }
121 }
122};
123
124/// Allows building an **outgoing** RabbitMQ [`Dispatch`] iteratively.
125///
126/// Expects that the payload has been converted to a vector of bytes.
127pub struct DispatchBuilder {
128 bytes: Vec<u8>,
129 properties: AMQPProperties,
130 headers: FieldTable,
131 routing_key: Option<String>,
132}
133
134impl DispatchBuilder {
135 /// Creates a new [`Dispatch`] builder.
136 pub fn new() -> Self {
137 Self {
138 bytes: Vec::new(),
139 properties: AMQPProperties::default(),
140 headers: FieldTable::default(),
141 routing_key: None,
142 }
143 }
144
145 /// Creates a builder from the given vector of bytes and the given parts of
146 /// a destructed [`Envelope`].
147 ///
148 /// This factory is for internal use.
149 pub(crate) fn from_bytes_and_properties(bytes: Vec<u8>, properties: AMQPProperties) -> Self {
150 let headers: FieldTable = properties.headers().clone().unwrap_or_default();
151 Self {
152 bytes,
153 properties,
154 headers,
155 routing_key: None,
156 }
157 }
158
159 /// Sets the payload of this [`Dispatch`] to the given bytes.
160 ///
161 /// This method is specifically made to take an owned `Vec<u8>`, to make sure
162 /// no copying occurs and the bytes are simply moved into this dispatch.
163 ///
164 /// When copying of bytes is acceptable or desired, use
165 /// [`with_byte_ref`](DispatchBuilder::with_byte_ref).
166 pub fn with_bytes(mut self, bytes: Vec<u8>) -> Self {
167 self.bytes = bytes;
168
169 self
170 }
171
172 /// Copies the given bytes to the payload of this [`Dispatch`].
173 pub fn with_byte_ref(mut self, bytes: impl AsRef<[u8]>) -> Self {
174 self.bytes = bytes.as_ref().to_vec();
175
176 self
177 }
178
179 /// Sets the durability flag in the [`AMQPProperties`] of this [`Dispatch`]
180 /// to [“durable”](DeliveryMode::Durable).
181 pub fn durable(mut self) -> Self {
182 self.properties = self.properties.push_delivery_mode(DeliveryMode::Durable);
183
184 self
185 }
186
187 /// Sets the durability flag in the [`AMQPProperties`] of this [`Dispatch`]
188 /// to [“transient”](DeliveryMode::Transient).
189 pub fn transient(mut self) -> Self {
190 self.properties = self.properties.push_delivery_mode(DeliveryMode::Transient);
191
192 self
193 }
194
195 /// Sets the priority in the [`AMQPProperties`] of this [`Dispatch`] to
196 /// the given value.
197 pub fn with_priority(mut self, priority: u8) -> Self {
198 self.properties = self.properties.push_priority(priority);
199
200 self
201 }
202
203 /// Sets the timestamp in the [`AMQPProperties`] of this [`Dispatch`] to
204 /// the given value.
205 pub fn with_timestamp(mut self, timestamp: u64) -> Self {
206 self.properties = self.properties.push_timestamp(timestamp);
207
208 self
209 }
210
211 /// Sets the timestamp in the [`AMQPProperties`] of this [`Dispatch`] to
212 /// the current timestamp.
213 pub fn with_current_timestamp(mut self) -> Self {
214 self.properties = self.properties.push_current_timestamp();
215
216 self
217 }
218
219 /// Sets the attempt header of this [`Dispatch`] to the given value.
220 pub fn with_attempt(mut self, attempt: u32) -> Self {
221 self.headers.push_attempt(attempt);
222
223 self
224 }
225
226 /// Increments the attempt header of this [`Dispatch`] by one. If the attempt
227 /// is not yet present, it will be assumed to be zero, and will be incremented
228 /// to one.
229 pub fn with_incremented_attempt(mut self) -> Self {
230 self.headers.increment_attempt();
231
232 self
233 }
234
235 /// Sets the content type of this [`Dispatch`] to the given value.
236 ///
237 /// ## Example
238 ///
239 /// ```
240 /// use strut_rabbitmq::Dispatch;
241 ///
242 /// let dispatch = Dispatch::builder().with_content_type("application/json").build();
243 /// ```
244 pub fn with_content_type<T>(mut self, content_type: T) -> Self
245 where
246 ShortString: Morph<T>,
247 {
248 self.properties = self.properties.push_content_type(content_type);
249
250 self
251 }
252
253 /// Sets the content encoding of this [`Dispatch`] to the given value.
254 ///
255 /// ## Example
256 ///
257 /// ```
258 /// use strut_rabbitmq::Dispatch;
259 ///
260 /// let dispatch = Dispatch::builder().with_content_encoding("gzip").build();
261 /// ```
262 pub fn with_content_encoding<T>(mut self, content_encoding: T) -> Self
263 where
264 ShortString: Morph<T>,
265 {
266 self.properties = self.properties.push_content_encoding(content_encoding);
267
268 self
269 }
270
271 /// Sets a header of this [`Dispatch`] under the given `key` to the given
272 /// `value`.
273 ///
274 /// ## Example
275 ///
276 /// ```
277 /// use strut_rabbitmq::Dispatch;
278 ///
279 /// let dispatch = Dispatch::builder()
280 /// .with_header("created_via", "strut")
281 /// .with_header("processed_in_secs", 12.9)
282 /// .build();
283 /// ```
284 pub fn with_header<T>(mut self, key: &str, value: T) -> Self
285 where
286 AMQPValue: Morph<T>,
287 {
288 self.headers.push(key, value);
289
290 self
291 }
292
293 /// Sets the correlation ID of this [`Dispatch`] to the given value.
294 ///
295 /// ## Example
296 ///
297 /// ```
298 /// use strut_rabbitmq::Dispatch;
299 ///
300 /// let dispatch = Dispatch::builder().with_correlation_id(4489).build();
301 /// ```
302 pub fn with_correlation_id<T>(mut self, correlation_id: T) -> Self
303 where
304 ShortString: Morph<T>,
305 {
306 self.properties = self.properties.push_correlation_id(correlation_id);
307
308 self
309 }
310
311 /// Sets the “reply-to” value of this [`Dispatch`] to the given value.
312 ///
313 /// ## Example
314 ///
315 /// ```
316 /// use strut_rabbitmq::Dispatch;
317 ///
318 /// let dispatch = Dispatch::builder().with_reply_to("me").build();
319 /// ```
320 pub fn with_reply_to<T>(mut self, reply_to: T) -> Self
321 where
322 ShortString: Morph<T>,
323 {
324 self.properties = self.properties.push_reply_to(reply_to);
325
326 self
327 }
328
329 /// Sets the expiration of this [`Dispatch`] to the given value.
330 ///
331 /// ## Example
332 ///
333 /// ```
334 /// use strut_rabbitmq::Dispatch;
335 ///
336 /// let dispatch = Dispatch::builder().with_expiration(86400).build();
337 /// ```
338 pub fn with_expiration<T>(mut self, expiration: T) -> Self
339 where
340 ShortString: Morph<T>,
341 {
342 self.properties = self.properties.push_expiration(expiration);
343
344 self
345 }
346
347 /// Sets the message ID of this [`Dispatch`] to the given value.
348 ///
349 /// ## Example
350 ///
351 /// ```
352 /// use strut_rabbitmq::Dispatch;
353 ///
354 /// let dispatch = Dispatch::builder().with_message_id(664833405).build();
355 /// ```
356 pub fn with_message_id<T>(mut self, message_id: T) -> Self
357 where
358 ShortString: Morph<T>,
359 {
360 self.properties = self.properties.push_message_id(message_id);
361
362 self
363 }
364
365 /// Sets the kind of this [`Dispatch`] to the given value.
366 ///
367 /// ## Example
368 ///
369 /// ```
370 /// use strut_rabbitmq::Dispatch;
371 ///
372 /// let dispatch = Dispatch::builder().with_kind("priority").build();
373 /// ```
374 pub fn with_kind<T>(mut self, kind: T) -> Self
375 where
376 ShortString: Morph<T>,
377 {
378 self.properties = self.properties.push_kind(kind);
379
380 self
381 }
382
383 /// Sets the user ID of this [`Dispatch`] to the given value.
384 ///
385 /// ## Example
386 ///
387 /// ```
388 /// use strut_rabbitmq::Dispatch;
389 ///
390 /// let dispatch = Dispatch::builder()
391 /// .with_user_id("01522090-f465-4a44-bd3d-9e06c061f6ac")
392 /// .build();
393 /// ```
394 pub fn with_user_id<T>(mut self, user_id: T) -> Self
395 where
396 ShortString: Morph<T>,
397 {
398 self.properties = self.properties.push_user_id(user_id);
399
400 self
401 }
402
403 /// Sets the app ID of this [`Dispatch`] to the given value.
404 ///
405 /// ## Example
406 ///
407 /// ```
408 /// use strut_rabbitmq::Dispatch;
409 ///
410 /// let dispatch = Dispatch::builder().with_app_id("app_17").build();
411 /// ```
412 pub fn with_app_id<T>(mut self, app_id: T) -> Self
413 where
414 ShortString: Morph<T>,
415 {
416 self.properties = self.properties.push_app_id(app_id);
417
418 self
419 }
420
421 /// Sets the cluster ID of this [`Dispatch`] to the given value.
422 ///
423 /// ## Example
424 ///
425 /// ```
426 /// use strut_rabbitmq::Dispatch;
427 ///
428 /// let dispatch = Dispatch::builder().with_cluster_id(7747).build();
429 /// ```
430 pub fn with_cluster_id<T>(mut self, cluster_id: T) -> Self
431 where
432 ShortString: Morph<T>,
433 {
434 self.properties = self.properties.push_cluster_id(cluster_id);
435
436 self
437 }
438
439 /// Defines a routing key to be used just for this dispatch.
440 ///
441 /// If this method is never called, the [`Publisher`](crate::Publisher)
442 /// instead uses the routing key [configured](crate::Egress::routing_key) on
443 /// the [`Egress`](crate::Egress).
444 pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
445 self.routing_key = Some(routing_key.into());
446
447 self
448 }
449
450 /// Builds the [`Dispatch`].
451 pub fn build(self) -> Dispatch {
452 // Destructure self
453 let DispatchBuilder {
454 bytes,
455 mut properties,
456 headers,
457 ..
458 } = self;
459
460 // Put the headers into the properties
461 properties = properties.with_headers(headers);
462
463 Dispatch::new(bytes, properties, self.routing_key)
464 }
465}