1use crate::{
2 connection::ConnectionManager,
3 error::{RabbitError, Result},
4 metrics::{MetricsTimer, RustRabbitMetrics},
5};
6use lapin::{
7 options::{
8 BasicPublishOptions, ExchangeDeclareOptions as LapinExchangeDeclareOptions,
9 QueueDeclareOptions as LapinQueueDeclareOptions,
10 },
11 types::FieldTable,
12 BasicProperties, Channel, ExchangeKind,
13};
14use serde::Serialize;
15use std::{collections::HashMap, time::Duration};
16use tracing::debug;
17use uuid::Uuid;
18
19#[derive(Debug, Clone)]
21pub struct Publisher {
22 connection_manager: ConnectionManager,
23 metrics: Option<RustRabbitMetrics>,
24}
25
26impl Publisher {
27 pub fn new(connection_manager: ConnectionManager) -> Self {
29 Self {
30 connection_manager,
31 metrics: None,
32 }
33 }
34
35 pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
37 self.metrics = Some(metrics);
38 }
39
40 pub async fn publish_to_queue<T>(
42 &self,
43 queue_name: &str,
44 message: &T,
45 options: Option<PublishOptions>,
46 ) -> Result<()>
47 where
48 T: Serialize,
49 {
50 let timer = MetricsTimer::new();
51 let channel = self.get_channel().await?;
52
53 let options = options.unwrap_or_default();
55 if options.auto_declare_queue {
56 self.declare_queue(&channel, queue_name, &options.queue_options)
57 .await?;
58 }
59
60 let payload = self.serialize_message(message)?;
61 let properties = self.build_basic_properties(&options)?;
62
63 let result = channel
64 .basic_publish(
65 "", queue_name,
67 BasicPublishOptions::default(),
68 &payload,
69 properties,
70 )
71 .await;
72
73 if let Some(metrics) = &self.metrics {
75 match &result {
76 Ok(_) => {
77 metrics.record_message_published(queue_name, "", queue_name);
78 metrics.record_publish_duration(queue_name, "", timer.elapsed());
79 }
80 Err(_) => {
81 metrics.record_publish_duration(queue_name, "", timer.elapsed());
83 }
84 }
85 }
86
87 result?;
88 debug!("Published message to queue: {}", queue_name);
89 Ok(())
90 }
91
92 pub async fn publish_to_exchange<T>(
94 &self,
95 exchange_name: &str,
96 routing_key: &str,
97 message: &T,
98 options: Option<PublishOptions>,
99 ) -> Result<()>
100 where
101 T: Serialize,
102 {
103 let timer = MetricsTimer::new();
104 let channel = self.get_channel().await?;
105
106 let options = options.unwrap_or_default();
108 if options.auto_declare_exchange {
109 self.declare_exchange(&channel, exchange_name, &options.exchange_options)
110 .await?;
111 }
112
113 let payload = self.serialize_message(message)?;
114 let properties = self.build_basic_properties(&options)?;
115
116 let result = channel
117 .basic_publish(
118 exchange_name,
119 routing_key,
120 BasicPublishOptions::default(),
121 &payload,
122 properties,
123 )
124 .await;
125
126 if let Some(metrics) = &self.metrics {
128 match &result {
129 Ok(_) => {
130 metrics.record_message_published("", exchange_name, routing_key);
131 metrics.record_publish_duration("", exchange_name, timer.elapsed());
132 }
133 Err(_) => {
134 metrics.record_publish_duration("", exchange_name, timer.elapsed());
135 }
136 }
137 }
138
139 result?;
140 debug!(
141 "Published message to exchange: {} with routing key: {}",
142 exchange_name, routing_key
143 );
144 Ok(())
145 }
146
147 pub async fn publish_delayed<T>(
149 &self,
150 exchange_name: &str,
151 routing_key: &str,
152 message: &T,
153 delay: Duration,
154 options: Option<PublishOptions>,
155 ) -> Result<()>
156 where
157 T: Serialize,
158 {
159 let channel = self.get_channel().await?;
160
161 let options = options.unwrap_or_default();
162
163 if options.auto_declare_exchange {
165 let mut exchange_opts = options.exchange_options.clone();
166 exchange_opts.exchange_type = ExchangeKind::Custom("x-delayed-message".to_string());
167
168 self.declare_exchange(&channel, exchange_name, &exchange_opts)
170 .await?;
171 }
172
173 let payload = self.serialize_message(message)?;
174 let mut properties = self.build_basic_properties(&options)?;
175
176 let mut headers = properties.headers().clone().unwrap_or_default();
178 headers.insert(
179 "x-delay".into(),
180 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
181 );
182 properties = properties.with_headers(headers);
183
184 channel
185 .basic_publish(
186 exchange_name,
187 routing_key,
188 BasicPublishOptions::default(),
189 &payload,
190 properties,
191 )
192 .await?;
193
194 debug!(
195 "Published delayed message to exchange: {} with delay: {:?}",
196 exchange_name, delay
197 );
198 Ok(())
199 }
200
201 pub async fn publish_with_ttl<T>(
203 &self,
204 exchange_name: &str,
205 routing_key: &str,
206 message: &T,
207 ttl: Duration,
208 options: Option<PublishOptions>,
209 ) -> Result<()>
210 where
211 T: Serialize,
212 {
213 let mut options = options.unwrap_or_default();
214 options.ttl = Some(ttl);
215
216 self.publish_to_exchange(exchange_name, routing_key, message, Some(options))
217 .await
218 }
219
220 async fn get_channel(&self) -> Result<Channel> {
222 let connection = self.connection_manager.get_connection().await?;
223 connection.create_channel().await
224 }
225
226 pub async fn get_connection(&self) -> Result<std::sync::Arc<crate::connection::Connection>> {
228 self.connection_manager.get_connection().await
229 }
230
231 fn serialize_message<T>(&self, message: &T) -> Result<Vec<u8>>
233 where
234 T: Serialize,
235 {
236 serde_json::to_vec(message).map_err(RabbitError::Serialization)
237 }
238
239 fn build_basic_properties(&self, options: &PublishOptions) -> Result<BasicProperties> {
241 let mut properties = BasicProperties::default()
242 .with_content_type("application/json".into())
243 .with_delivery_mode(if options.persistent { 2 } else { 1 });
244
245 if let Some(message_id) = &options.message_id {
246 properties = properties.with_message_id(message_id.clone().into());
247 }
248
249 if let Some(correlation_id) = &options.correlation_id {
250 properties = properties.with_correlation_id(correlation_id.clone().into());
251 }
252
253 if let Some(reply_to) = &options.reply_to {
254 properties = properties.with_reply_to(reply_to.clone().into());
255 }
256
257 if let Some(ttl) = options.ttl {
258 properties = properties.with_expiration(ttl.as_millis().to_string().into());
259 }
260
261 if let Some(priority) = options.priority {
262 properties = properties.with_priority(priority);
263 }
264
265 if !options.headers.is_empty() {
266 let mut field_table = FieldTable::default();
267 for (key, value) in &options.headers {
268 field_table.insert(key.clone().into(), value.clone());
269 }
270 properties = properties.with_headers(field_table);
271 }
272
273 properties = properties.with_timestamp(chrono::Utc::now().timestamp() as u64);
274
275 Ok(properties)
276 }
277
278 async fn declare_queue(
280 &self,
281 channel: &Channel,
282 queue_name: &str,
283 options: &CustomQueueDeclareOptions,
284 ) -> Result<()> {
285 let queue_options = LapinQueueDeclareOptions {
286 passive: options.passive,
287 durable: options.durable,
288 exclusive: options.exclusive,
289 auto_delete: options.auto_delete,
290 nowait: false,
291 };
292
293 channel
294 .queue_declare(queue_name, queue_options, options.arguments.clone())
295 .await?;
296
297 debug!("Declared queue: {}", queue_name);
298 Ok(())
299 }
300
301 async fn declare_exchange(
303 &self,
304 channel: &Channel,
305 exchange_name: &str,
306 options: &CustomExchangeDeclareOptions,
307 ) -> Result<()> {
308 let exchange_kind = match &options.exchange_type {
309 ExchangeKind::Custom(custom_type) => {
310 if custom_type == "x-delayed-message" {
311 let mut arguments = options.arguments.clone();
313 let original_type_str = match &options.original_type {
314 ExchangeKind::Direct => "direct",
315 ExchangeKind::Fanout => "fanout",
316 ExchangeKind::Topic => "topic",
317 ExchangeKind::Headers => "headers",
318 ExchangeKind::Custom(custom) => custom,
319 };
320 arguments.insert(
321 "x-delayed-type".into(),
322 lapin::types::AMQPValue::LongString(original_type_str.into()),
323 );
324
325 let exchange_options = LapinExchangeDeclareOptions {
326 passive: options.passive,
327 durable: options.durable,
328 auto_delete: options.auto_delete,
329 internal: options.internal,
330 nowait: false,
331 };
332
333 channel
334 .exchange_declare(
335 exchange_name,
336 ExchangeKind::Custom("x-delayed-message".to_string()),
337 exchange_options,
338 arguments,
339 )
340 .await?;
341
342 debug!("Declared delayed message exchange: {}", exchange_name);
343 return Ok(());
344 } else {
345 lapin::ExchangeKind::Custom(custom_type.clone())
346 }
347 }
348 other => other.clone(),
349 };
350
351 let exchange_options = LapinExchangeDeclareOptions {
352 passive: options.passive,
353 durable: options.durable,
354 auto_delete: options.auto_delete,
355 internal: options.internal,
356 nowait: false,
357 };
358
359 channel
360 .exchange_declare(
361 exchange_name,
362 exchange_kind,
363 exchange_options,
364 options.arguments.clone(),
365 )
366 .await?;
367
368 debug!(
369 "Declared exchange: {} of type: {:?}",
370 exchange_name, options.exchange_type
371 );
372 Ok(())
373 }
374}
375
376#[derive(Debug, Clone)]
378pub struct PublishOptions {
379 pub persistent: bool,
381
382 pub message_id: Option<String>,
384
385 pub correlation_id: Option<String>,
387
388 pub reply_to: Option<String>,
390
391 pub ttl: Option<Duration>,
393
394 pub priority: Option<u8>,
396
397 pub headers: HashMap<String, lapin::types::AMQPValue>,
399
400 pub auto_declare_queue: bool,
402
403 pub auto_declare_exchange: bool,
405
406 pub queue_options: CustomQueueDeclareOptions,
408
409 pub exchange_options: CustomExchangeDeclareOptions,
411}
412
413impl PublishOptions {
414 pub fn builder() -> PublishOptionsBuilder {
416 PublishOptionsBuilder::new()
417 }
418}
419
420#[derive(Debug, Clone)]
422pub struct PublishOptionsBuilder {
423 persistent: bool,
424 message_id: Option<String>,
425 correlation_id: Option<String>,
426 reply_to: Option<String>,
427 ttl: Option<Duration>,
428 priority: Option<u8>,
429 headers: HashMap<String, lapin::types::AMQPValue>,
430 auto_declare_queue: bool,
431 auto_declare_exchange: bool,
432 queue_options: CustomQueueDeclareOptions,
433 exchange_options: CustomExchangeDeclareOptions,
434}
435
436impl PublishOptionsBuilder {
437 pub fn new() -> Self {
439 Self {
440 persistent: true,
441 message_id: Some(Uuid::new_v4().to_string()),
442 correlation_id: None,
443 reply_to: None,
444 ttl: None,
445 priority: None,
446 headers: HashMap::new(),
447 auto_declare_queue: false,
448 auto_declare_exchange: false,
449 queue_options: CustomQueueDeclareOptions::default(),
450 exchange_options: CustomExchangeDeclareOptions::default(),
451 }
452 }
453
454 pub fn persistent(mut self, persistent: bool) -> Self {
456 self.persistent = persistent;
457 self
458 }
459
460 pub fn durable(mut self) -> Self {
462 self.persistent = true;
463 self
464 }
465
466 pub fn transient(mut self) -> Self {
468 self.persistent = false;
469 self
470 }
471
472 pub fn message_id<S: Into<String>>(mut self, id: S) -> Self {
474 self.message_id = Some(id.into());
475 self
476 }
477
478 pub fn random_message_id(mut self) -> Self {
480 self.message_id = Some(Uuid::new_v4().to_string());
481 self
482 }
483
484 pub fn no_message_id(mut self) -> Self {
486 self.message_id = None;
487 self
488 }
489
490 pub fn correlation_id<S: Into<String>>(mut self, id: S) -> Self {
492 self.correlation_id = Some(id.into());
493 self
494 }
495
496 pub fn reply_to<S: Into<String>>(mut self, queue: S) -> Self {
498 self.reply_to = Some(queue.into());
499 self
500 }
501
502 pub fn ttl(mut self, ttl: Duration) -> Self {
504 self.ttl = Some(ttl);
505 self
506 }
507
508 pub fn priority(mut self, priority: u8) -> Self {
510 self.priority = Some(priority);
511 self
512 }
513
514 pub fn header<S: Into<String>>(mut self, key: S, value: lapin::types::AMQPValue) -> Self {
516 self.headers.insert(key.into(), value);
517 self
518 }
519
520 pub fn header_string<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
522 self.headers.insert(
523 key.into(),
524 lapin::types::AMQPValue::LongString(value.into().into()),
525 );
526 self
527 }
528
529 pub fn header_int<K: Into<String>>(mut self, key: K, value: i64) -> Self {
531 self.headers
532 .insert(key.into(), lapin::types::AMQPValue::LongLongInt(value));
533 self
534 }
535
536 pub fn auto_declare_queue(mut self) -> Self {
538 self.auto_declare_queue = true;
539 self
540 }
541
542 pub fn auto_declare_exchange(mut self) -> Self {
544 self.auto_declare_exchange = true;
545 self
546 }
547
548 pub fn queue_options(mut self, options: CustomQueueDeclareOptions) -> Self {
550 self.queue_options = options;
551 self
552 }
553
554 pub fn exchange_options(mut self, options: CustomExchangeDeclareOptions) -> Self {
556 self.exchange_options = options;
557 self
558 }
559
560 pub fn request_response<S: Into<String>>(mut self, reply_to: S, correlation_id: S) -> Self {
562 self.reply_to = Some(reply_to.into());
563 self.correlation_id = Some(correlation_id.into());
564 self
565 }
566
567 pub fn development(mut self) -> Self {
569 self.auto_declare_queue = true;
570 self.auto_declare_exchange = true;
571 self
572 }
573
574 pub fn production(mut self) -> Self {
576 self.auto_declare_queue = false;
577 self.auto_declare_exchange = false;
578 self.persistent = true;
579 self
580 }
581
582 pub fn build(self) -> PublishOptions {
584 PublishOptions {
585 persistent: self.persistent,
586 message_id: self.message_id,
587 correlation_id: self.correlation_id,
588 reply_to: self.reply_to,
589 ttl: self.ttl,
590 priority: self.priority,
591 headers: self.headers,
592 auto_declare_queue: self.auto_declare_queue,
593 auto_declare_exchange: self.auto_declare_exchange,
594 queue_options: self.queue_options,
595 exchange_options: self.exchange_options,
596 }
597 }
598}
599
600impl Default for PublishOptionsBuilder {
601 fn default() -> Self {
602 Self::new()
603 }
604}
605
606impl Default for PublishOptions {
607 fn default() -> Self {
608 Self {
609 persistent: true,
610 message_id: Some(Uuid::new_v4().to_string()),
611 correlation_id: None,
612 reply_to: None,
613 ttl: None,
614 priority: None,
615 headers: HashMap::new(),
616 auto_declare_queue: false,
617 auto_declare_exchange: false,
618 queue_options: CustomQueueDeclareOptions::default(),
619 exchange_options: CustomExchangeDeclareOptions::default(),
620 }
621 }
622}
623
624#[derive(Debug, Clone)]
626pub struct CustomQueueDeclareOptions {
627 pub passive: bool,
628 pub durable: bool,
629 pub exclusive: bool,
630 pub auto_delete: bool,
631 pub arguments: FieldTable,
632}
633
634impl Default for CustomQueueDeclareOptions {
635 fn default() -> Self {
636 Self {
637 passive: false,
638 durable: true,
639 exclusive: false,
640 auto_delete: false,
641 arguments: FieldTable::default(),
642 }
643 }
644}
645
646#[derive(Debug, Clone)]
648pub struct CustomExchangeDeclareOptions {
649 pub passive: bool,
650 pub durable: bool,
651 pub auto_delete: bool,
652 pub internal: bool,
653 pub exchange_type: ExchangeKind,
654 pub original_type: ExchangeKind, pub arguments: FieldTable,
656}
657
658impl Default for CustomExchangeDeclareOptions {
659 fn default() -> Self {
660 Self {
661 passive: false,
662 durable: true,
663 auto_delete: false,
664 internal: false,
665 exchange_type: ExchangeKind::Direct,
666 original_type: ExchangeKind::Direct,
667 arguments: FieldTable::default(),
668 }
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675
676 #[test]
677 fn test_publish_options_default() {
678 let options = PublishOptions::default();
679 assert!(options.persistent);
680 assert!(options.message_id.is_some());
681 assert!(options.correlation_id.is_none());
682 assert!(options.reply_to.is_none());
683 assert!(options.ttl.is_none());
684 assert!(options.priority.is_none());
685 assert!(options.headers.is_empty());
686 assert!(!options.auto_declare_queue);
687 assert!(!options.auto_declare_exchange);
688 }
689
690 #[test]
691 fn test_queue_declare_options_default() {
692 let options = CustomQueueDeclareOptions::default();
693 assert!(!options.passive);
694 assert!(options.durable);
695 assert!(!options.exclusive);
696 assert!(!options.auto_delete);
697 }
698
699 #[test]
700 fn test_exchange_declare_options_default() {
701 let options = CustomExchangeDeclareOptions::default();
702 assert!(!options.passive);
703 assert!(options.durable);
704 assert!(!options.auto_delete);
705 assert!(!options.internal);
706 assert!(matches!(options.exchange_type, ExchangeKind::Direct));
707 assert!(matches!(options.original_type, ExchangeKind::Direct));
708 }
709}