1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{MassTransitEnvelope, MessageEnvelope},
5};
6use lapin::{
7 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
8 types::{AMQPValue, FieldTable},
9 BasicProperties, Channel, ExchangeKind,
10};
11use serde::Serialize;
12use std::sync::Arc;
13use tracing::debug;
14use url::Url;
15
16#[derive(Debug, Clone, Default)]
18pub struct PublishOptions {
19 pub mandatory: bool,
20 pub immediate: bool,
21 pub expiration: Option<String>,
22 pub priority: Option<u8>,
23 pub masstransit: Option<MassTransitOptions>,
25}
26
27#[derive(Debug, Clone)]
29pub struct MassTransitOptions {
30 pub message_type: String,
33 pub correlation_id: Option<String>,
35 pub source_address: Option<String>,
37 pub destination_address: Option<String>,
39}
40
41impl PublishOptions {
42 pub fn new() -> Self {
43 Self::default()
44 }
45
46 pub fn mandatory(mut self) -> Self {
47 self.mandatory = true;
48 self
49 }
50
51 pub fn priority(mut self, priority: u8) -> Self {
52 self.priority = Some(priority);
53 self
54 }
55
56 pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
57 self.expiration = Some(expiration.into());
58 self
59 }
60
61 pub fn with_priority(mut self, priority: u8) -> Self {
62 self.priority = Some(priority);
63 self
64 }
65
66 pub fn with_masstransit(mut self, message_type: impl Into<String>) -> Self {
69 self.masstransit = Some(MassTransitOptions {
70 message_type: message_type.into(),
71 correlation_id: None,
72 source_address: None,
73 destination_address: None,
74 });
75 self
76 }
77
78 pub fn with_masstransit_options(mut self, options: MassTransitOptions) -> Self {
80 self.masstransit = Some(options);
81 self
82 }
83}
84
85impl MassTransitOptions {
86 pub fn new(message_type: impl Into<String>) -> Self {
88 Self {
89 message_type: message_type.into(),
90 correlation_id: None,
91 source_address: None,
92 destination_address: None,
93 }
94 }
95
96 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
98 self.correlation_id = Some(correlation_id.into());
99 self
100 }
101
102 pub fn with_source_address(mut self, source_address: impl Into<String>) -> Self {
104 self.source_address = Some(source_address.into());
105 self
106 }
107
108 pub fn with_destination_address(mut self, destination_address: impl Into<String>) -> Self {
110 self.destination_address = Some(destination_address.into());
111 self
112 }
113}
114
115pub struct Publisher {
117 connection: Arc<Connection>,
118}
119
120impl Publisher {
121 pub fn new(connection: Arc<Connection>) -> Self {
123 Self { connection }
124 }
125
126 pub async fn publish_to_exchange<T>(
128 &self,
129 exchange: &str,
130 routing_key: &str,
131 message: &T,
132 options: Option<PublishOptions>,
133 ) -> Result<(), RustRabbitError>
134 where
135 T: Serialize,
136 {
137 let channel = self.connection.create_channel().await?;
138
139 channel
141 .exchange_declare(
142 exchange,
143 ExchangeKind::Topic,
144 ExchangeDeclareOptions {
145 durable: true,
146 ..Default::default()
147 },
148 FieldTable::default(),
149 )
150 .await?;
151
152 self.publish_message(&channel, exchange, routing_key, message, options)
153 .await
154 }
155
156 pub async fn publish_to_queue<T>(
158 &self,
159 queue: &str,
160 message: &T,
161 options: Option<PublishOptions>,
162 ) -> Result<(), RustRabbitError>
163 where
164 T: Serialize,
165 {
166 let channel = self.connection.create_channel().await?;
167
168 channel
170 .queue_declare(
171 queue,
172 QueueDeclareOptions {
173 durable: true,
174 ..Default::default()
175 },
176 FieldTable::default(),
177 )
178 .await?;
179
180 self.publish_message(&channel, "", queue, message, options)
182 .await
183 }
184
185 async fn publish_message<T>(
189 &self,
190 channel: &Channel,
191 exchange: &str,
192 routing_key: &str,
193 message: &T,
194 options: Option<PublishOptions>,
195 ) -> Result<(), RustRabbitError>
196 where
197 T: Serialize,
198 {
199 let options = options.unwrap_or_default();
200
201 let payload = if let Some(mt_options) = &options.masstransit {
203 let mut envelope =
205 MassTransitEnvelope::with_message_type(message, &mt_options.message_type)
206 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
207
208 if let Some(corr_id) = &mt_options.correlation_id {
210 envelope = envelope.with_correlation_id(corr_id.clone());
211 }
212
213 let host = self
215 .connection
216 .url()
217 .parse::<Url>()
218 .ok()
219 .and_then(|url| url.host_str().map(|h| h.to_string()))
220 .unwrap_or_else(|| "localhost".to_string());
221
222 let source = mt_options
224 .source_address
225 .clone()
226 .unwrap_or_else(|| format!("rabbitmq://{}/{}", host, exchange));
227 envelope = envelope.with_source_address(source);
228
229 let dest = mt_options
231 .destination_address
232 .clone()
233 .unwrap_or_else(|| format!("rabbitmq://{}/{}", host, routing_key));
234 envelope = envelope.with_destination_address(dest);
235
236 serde_json::to_vec(&envelope)
238 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
239 } else {
240 serde_json::to_vec(message)
242 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
243 };
244
245 let mut headers = FieldTable::default();
248 headers.insert("x-retry-attempt".into(), AMQPValue::LongLongInt(0));
249
250 let mut properties = BasicProperties::default()
251 .with_content_type("application/json".into())
252 .with_delivery_mode(2) .with_headers(headers);
254
255 if let Some(expiration) = options.expiration {
256 properties = properties.with_expiration(expiration.into());
257 }
258
259 if let Some(priority) = options.priority {
260 properties = properties.with_priority(priority);
261 }
262
263 let confirm = channel
265 .basic_publish(
266 exchange,
267 routing_key,
268 BasicPublishOptions {
269 mandatory: options.mandatory,
270 immediate: options.immediate,
271 },
272 &payload,
273 properties,
274 )
275 .await?;
276
277 confirm.await?;
279
280 if options.masstransit.is_some() {
281 debug!(
282 "Published MassTransit message to exchange '{}' with routing key '{}'",
283 exchange, routing_key
284 );
285 } else {
286 debug!(
287 "Published message to exchange '{}' with routing key '{}'",
288 exchange, routing_key
289 );
290 }
291
292 Ok(())
293 }
294
295 pub async fn publish_envelope_to_exchange<T>(
297 &self,
298 exchange: &str,
299 routing_key: &str,
300 envelope: &MessageEnvelope<T>,
301 options: Option<PublishOptions>,
302 ) -> Result<(), RustRabbitError>
303 where
304 T: Serialize,
305 {
306 self.publish_to_exchange(exchange, routing_key, envelope, options)
307 .await
308 }
309
310 pub async fn publish_envelope_to_queue<T>(
312 &self,
313 queue: &str,
314 envelope: &MessageEnvelope<T>,
315 options: Option<PublishOptions>,
316 ) -> Result<(), RustRabbitError>
317 where
318 T: Serialize,
319 {
320 self.publish_to_queue(queue, envelope, options).await
321 }
322
323 pub async fn publish_with_envelope<T>(
325 &self,
326 exchange: &str,
327 routing_key: &str,
328 payload: &T,
329 source_queue: &str,
330 max_retries: u32,
331 options: Option<PublishOptions>,
332 ) -> Result<(), RustRabbitError>
333 where
334 T: Serialize + Clone,
335 {
336 let envelope = MessageEnvelope::with_source(
337 payload.clone(),
338 source_queue,
339 Some(exchange),
340 Some(routing_key),
341 Some("rust-rabbit-publisher"), )
343 .with_max_retries(max_retries);
344
345 self.publish_envelope_to_exchange(exchange, routing_key, &envelope, options)
346 .await
347 }
348
349 pub async fn publish_with_envelope_to_queue<T>(
351 &self,
352 queue: &str,
353 payload: &T,
354 max_retries: u32,
355 options: Option<PublishOptions>,
356 ) -> Result<(), RustRabbitError>
357 where
358 T: Serialize + Clone,
359 {
360 let envelope = MessageEnvelope::new(payload.clone(), queue).with_max_retries(max_retries);
361
362 self.publish_envelope_to_queue(queue, &envelope, options)
363 .await
364 }
365
366 pub async fn publish_masstransit_to_exchange<T>(
404 &self,
405 exchange: &str,
406 routing_key: &str,
407 message: &T,
408 message_type: &str,
409 options: Option<PublishOptions>,
410 ) -> Result<(), RustRabbitError>
411 where
412 T: Serialize,
413 {
414 let channel = self.connection.create_channel().await?;
415
416 channel
418 .exchange_declare(
419 exchange,
420 ExchangeKind::Topic,
421 ExchangeDeclareOptions {
422 durable: true,
423 ..Default::default()
424 },
425 FieldTable::default(),
426 )
427 .await?;
428
429 let host = self
431 .connection
432 .url()
433 .parse::<Url>()
434 .ok()
435 .and_then(|url| url.host_str().map(|h| h.to_string()))
436 .unwrap_or_else(|| "localhost".to_string());
437
438 let envelope = MassTransitEnvelope::with_message_type(message, message_type)
440 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
441 .with_source_address(format!("rabbitmq://{}/{}", host, exchange))
442 .with_destination_address(format!("rabbitmq://{}/{}", host, routing_key));
443
444 let payload = serde_json::to_vec(&envelope)
446 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
447
448 let options = options.unwrap_or_default();
450 let mut properties = BasicProperties::default()
451 .with_content_type("application/json".into())
452 .with_delivery_mode(2) .with_headers(FieldTable::default());
454
455 if let Some(expiration) = options.expiration {
456 properties = properties.with_expiration(expiration.into());
457 }
458
459 if let Some(priority) = options.priority {
460 properties = properties.with_priority(priority);
461 }
462
463 let confirm = channel
465 .basic_publish(
466 exchange,
467 routing_key,
468 BasicPublishOptions {
469 mandatory: options.mandatory,
470 immediate: options.immediate,
471 },
472 &payload,
473 properties,
474 )
475 .await?;
476
477 confirm.await?;
478
479 debug!(
480 "Published MassTransit message to exchange '{}' with routing key '{}' (type: {})",
481 exchange, routing_key, message_type
482 );
483
484 Ok(())
485 }
486
487 pub async fn publish_masstransit_to_queue<T>(
523 &self,
524 queue: &str,
525 message: &T,
526 message_type: &str,
527 options: Option<PublishOptions>,
528 ) -> Result<(), RustRabbitError>
529 where
530 T: Serialize,
531 {
532 let channel = self.connection.create_channel().await?;
533
534 channel
536 .queue_declare(
537 queue,
538 QueueDeclareOptions {
539 durable: true,
540 ..Default::default()
541 },
542 FieldTable::default(),
543 )
544 .await?;
545
546 let host = self
548 .connection
549 .url()
550 .parse::<Url>()
551 .ok()
552 .and_then(|url| url.host_str().map(|h| h.to_string()))
553 .unwrap_or_else(|| "localhost".to_string());
554
555 let envelope = MassTransitEnvelope::with_message_type(message, message_type)
557 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
558 .with_source_address(format!("rabbitmq://{}/{}", host, queue))
559 .with_destination_address(format!("rabbitmq://{}/{}", host, queue));
560
561 let payload = serde_json::to_vec(&envelope)
563 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
564
565 let options = options.unwrap_or_default();
567 let mut properties = BasicProperties::default()
568 .with_content_type("application/json".into())
569 .with_delivery_mode(2) .with_headers(FieldTable::default());
571
572 if let Some(expiration) = options.expiration {
573 properties = properties.with_expiration(expiration.into());
574 }
575
576 if let Some(priority) = options.priority {
577 properties = properties.with_priority(priority);
578 }
579
580 let confirm = channel
582 .basic_publish(
583 "", queue,
585 BasicPublishOptions {
586 mandatory: options.mandatory,
587 immediate: options.immediate,
588 },
589 &payload,
590 properties,
591 )
592 .await?;
593
594 confirm.await?;
595
596 debug!(
597 "Published MassTransit message to queue '{}' (type: {})",
598 queue, message_type
599 );
600
601 Ok(())
602 }
603
604 pub async fn publish_masstransit_envelope_to_exchange(
607 &self,
608 exchange: &str,
609 routing_key: &str,
610 envelope: &MassTransitEnvelope,
611 options: Option<PublishOptions>,
612 ) -> Result<(), RustRabbitError> {
613 let channel = self.connection.create_channel().await?;
614
615 channel
617 .exchange_declare(
618 exchange,
619 ExchangeKind::Topic,
620 ExchangeDeclareOptions {
621 durable: true,
622 ..Default::default()
623 },
624 FieldTable::default(),
625 )
626 .await?;
627
628 let payload = serde_json::to_vec(envelope)
630 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
631
632 let options = options.unwrap_or_default();
634 let mut properties = BasicProperties::default()
635 .with_content_type("application/json".into())
636 .with_delivery_mode(2)
637 .with_headers(FieldTable::default());
638
639 if let Some(expiration) = options.expiration {
640 properties = properties.with_expiration(expiration.into());
641 }
642
643 if let Some(priority) = options.priority {
644 properties = properties.with_priority(priority);
645 }
646
647 let confirm = channel
648 .basic_publish(
649 exchange,
650 routing_key,
651 BasicPublishOptions {
652 mandatory: options.mandatory,
653 immediate: options.immediate,
654 },
655 &payload,
656 properties,
657 )
658 .await?;
659
660 confirm.await?;
661
662 debug!(
663 "Published MassTransit envelope to exchange '{}' with routing key '{}'",
664 exchange, routing_key
665 );
666
667 Ok(())
668 }
669
670 pub async fn publish_masstransit_envelope_to_queue(
673 &self,
674 queue: &str,
675 envelope: &MassTransitEnvelope,
676 options: Option<PublishOptions>,
677 ) -> Result<(), RustRabbitError> {
678 let channel = self.connection.create_channel().await?;
679
680 channel
682 .queue_declare(
683 queue,
684 QueueDeclareOptions {
685 durable: true,
686 ..Default::default()
687 },
688 FieldTable::default(),
689 )
690 .await?;
691
692 let payload = serde_json::to_vec(envelope)
694 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
695
696 let options = options.unwrap_or_default();
698 let mut properties = BasicProperties::default()
699 .with_content_type("application/json".into())
700 .with_delivery_mode(2)
701 .with_headers(FieldTable::default());
702
703 if let Some(expiration) = options.expiration {
704 properties = properties.with_expiration(expiration.into());
705 }
706
707 if let Some(priority) = options.priority {
708 properties = properties.with_priority(priority);
709 }
710
711 let confirm = channel
712 .basic_publish(
713 "",
714 queue,
715 BasicPublishOptions {
716 mandatory: options.mandatory,
717 immediate: options.immediate,
718 },
719 &payload,
720 properties,
721 )
722 .await?;
723
724 confirm.await?;
725
726 debug!("Published MassTransit envelope to queue '{}'", queue);
727
728 Ok(())
729 }
730}