1use crate::{
2 connection::ConnectionManager,
3 error::{RabbitError, Result},
4 metrics::{MetricsTimer, RustRabbitMetrics},
5};
6use lapin::{
7 options::{
8 BasicPublishOptions, ExchangeDeclareOptions as LapinExchangeDeclareOptions,
9 QueueDeclareOptions as LapinQueueDeclareOptions,
10 },
11 types::FieldTable,
12 BasicProperties, Channel, ExchangeKind,
13};
14use serde::Serialize;
15use std::{collections::HashMap, time::Duration};
16use tracing::debug;
17use uuid::Uuid;
18
19#[derive(Debug, Clone)]
21pub struct Publisher {
22 connection_manager: ConnectionManager,
23 metrics: Option<RustRabbitMetrics>,
24}
25
26impl Publisher {
27 pub fn new(connection_manager: ConnectionManager) -> Self {
29 Self {
30 connection_manager,
31 metrics: None,
32 }
33 }
34
35 pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
37 self.metrics = Some(metrics);
38 }
39
40 pub async fn publish_to_queue<T>(
42 &self,
43 queue_name: &str,
44 message: &T,
45 options: Option<PublishOptions>,
46 ) -> Result<()>
47 where
48 T: Serialize,
49 {
50 let timer = MetricsTimer::new();
51 let channel = self.get_channel().await?;
52
53 let options = options.unwrap_or_default();
55 if options.auto_declare_queue {
56 self.declare_queue(&channel, queue_name, &options.queue_options)
57 .await?;
58 }
59
60 let payload = self.serialize_message(message)?;
61 let properties = self.build_basic_properties(&options)?;
62
63 let result = channel
64 .basic_publish(
65 "", queue_name,
67 BasicPublishOptions::default(),
68 &payload,
69 properties,
70 )
71 .await;
72
73 if let Some(metrics) = &self.metrics {
75 match &result {
76 Ok(_) => {
77 metrics.record_message_published(queue_name, "", queue_name);
78 metrics.record_publish_duration(queue_name, "", timer.elapsed());
79 }
80 Err(_) => {
81 metrics.record_publish_duration(queue_name, "", timer.elapsed());
83 }
84 }
85 }
86
87 result?;
88 debug!("Published message to queue: {}", queue_name);
89 Ok(())
90 }
91
92 pub async fn publish_to_exchange<T>(
94 &self,
95 exchange_name: &str,
96 routing_key: &str,
97 message: &T,
98 options: Option<PublishOptions>,
99 ) -> Result<()>
100 where
101 T: Serialize,
102 {
103 let timer = MetricsTimer::new();
104 let channel = self.get_channel().await?;
105
106 let options = options.unwrap_or_default();
108 if options.auto_declare_exchange {
109 self.declare_exchange(&channel, exchange_name, &options.exchange_options)
110 .await?;
111 }
112
113 let payload = self.serialize_message(message)?;
114 let properties = self.build_basic_properties(&options)?;
115
116 let result = channel
117 .basic_publish(
118 exchange_name,
119 routing_key,
120 BasicPublishOptions::default(),
121 &payload,
122 properties,
123 )
124 .await;
125
126 if let Some(metrics) = &self.metrics {
128 match &result {
129 Ok(_) => {
130 metrics.record_message_published("", exchange_name, routing_key);
131 metrics.record_publish_duration("", exchange_name, timer.elapsed());
132 }
133 Err(_) => {
134 metrics.record_publish_duration("", exchange_name, timer.elapsed());
135 }
136 }
137 }
138
139 result?;
140 debug!(
141 "Published message to exchange: {} with routing key: {}",
142 exchange_name, routing_key
143 );
144 Ok(())
145 }
146
147 pub async fn publish_delayed<T>(
149 &self,
150 exchange_name: &str,
151 routing_key: &str,
152 message: &T,
153 delay: Duration,
154 options: Option<PublishOptions>,
155 ) -> Result<()>
156 where
157 T: Serialize,
158 {
159 let channel = self.get_channel().await?;
160
161 let options = options.unwrap_or_default();
162
163 if options.auto_declare_exchange {
165 let mut exchange_opts = options.exchange_options.clone();
166 exchange_opts.exchange_type = ExchangeKind::Custom("x-delayed-message".to_string());
167
168 self.declare_exchange(&channel, exchange_name, &exchange_opts)
170 .await?;
171 }
172
173 let payload = self.serialize_message(message)?;
174 let mut properties = self.build_basic_properties(&options)?;
175
176 let mut headers = properties.headers().clone().unwrap_or_default();
178 headers.insert(
179 "x-delay".into(),
180 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
181 );
182 properties = properties.with_headers(headers);
183
184 channel
185 .basic_publish(
186 exchange_name,
187 routing_key,
188 BasicPublishOptions::default(),
189 &payload,
190 properties,
191 )
192 .await?;
193
194 debug!(
195 "Published delayed message to exchange: {} with delay: {:?}",
196 exchange_name, delay
197 );
198 Ok(())
199 }
200
201 pub async fn publish_with_ttl<T>(
203 &self,
204 exchange_name: &str,
205 routing_key: &str,
206 message: &T,
207 ttl: Duration,
208 options: Option<PublishOptions>,
209 ) -> Result<()>
210 where
211 T: Serialize,
212 {
213 let mut options = options.unwrap_or_default();
214 options.ttl = Some(ttl);
215
216 self.publish_to_exchange(exchange_name, routing_key, message, Some(options))
217 .await
218 }
219
220 async fn get_channel(&self) -> Result<Channel> {
222 let connection = self.connection_manager.get_connection().await?;
223 connection.create_channel().await
224 }
225
226 fn serialize_message<T>(&self, message: &T) -> Result<Vec<u8>>
228 where
229 T: Serialize,
230 {
231 serde_json::to_vec(message).map_err(RabbitError::Serialization)
232 }
233
234 fn build_basic_properties(&self, options: &PublishOptions) -> Result<BasicProperties> {
236 let mut properties = BasicProperties::default()
237 .with_content_type("application/json".into())
238 .with_delivery_mode(if options.persistent { 2 } else { 1 });
239
240 if let Some(message_id) = &options.message_id {
241 properties = properties.with_message_id(message_id.clone().into());
242 }
243
244 if let Some(correlation_id) = &options.correlation_id {
245 properties = properties.with_correlation_id(correlation_id.clone().into());
246 }
247
248 if let Some(reply_to) = &options.reply_to {
249 properties = properties.with_reply_to(reply_to.clone().into());
250 }
251
252 if let Some(ttl) = options.ttl {
253 properties = properties.with_expiration(ttl.as_millis().to_string().into());
254 }
255
256 if let Some(priority) = options.priority {
257 properties = properties.with_priority(priority);
258 }
259
260 if !options.headers.is_empty() {
261 let mut field_table = FieldTable::default();
262 for (key, value) in &options.headers {
263 field_table.insert(key.clone().into(), value.clone());
264 }
265 properties = properties.with_headers(field_table);
266 }
267
268 properties = properties.with_timestamp(chrono::Utc::now().timestamp() as u64);
269
270 Ok(properties)
271 }
272
273 async fn declare_queue(
275 &self,
276 channel: &Channel,
277 queue_name: &str,
278 options: &CustomQueueDeclareOptions,
279 ) -> Result<()> {
280 let queue_options = LapinQueueDeclareOptions {
281 passive: options.passive,
282 durable: options.durable,
283 exclusive: options.exclusive,
284 auto_delete: options.auto_delete,
285 nowait: false,
286 };
287
288 channel
289 .queue_declare(queue_name, queue_options, options.arguments.clone())
290 .await?;
291
292 debug!("Declared queue: {}", queue_name);
293 Ok(())
294 }
295
296 async fn declare_exchange(
298 &self,
299 channel: &Channel,
300 exchange_name: &str,
301 options: &CustomExchangeDeclareOptions,
302 ) -> Result<()> {
303 let exchange_kind = match &options.exchange_type {
304 ExchangeKind::Custom(custom_type) => {
305 if custom_type == "x-delayed-message" {
306 let mut arguments = options.arguments.clone();
308 let original_type_str = match &options.original_type {
309 ExchangeKind::Direct => "direct",
310 ExchangeKind::Fanout => "fanout",
311 ExchangeKind::Topic => "topic",
312 ExchangeKind::Headers => "headers",
313 ExchangeKind::Custom(custom) => custom,
314 };
315 arguments.insert(
316 "x-delayed-type".into(),
317 lapin::types::AMQPValue::LongString(original_type_str.into()),
318 );
319
320 let exchange_options = LapinExchangeDeclareOptions {
321 passive: options.passive,
322 durable: options.durable,
323 auto_delete: options.auto_delete,
324 internal: options.internal,
325 nowait: false,
326 };
327
328 channel
329 .exchange_declare(
330 exchange_name,
331 ExchangeKind::Custom("x-delayed-message".to_string()),
332 exchange_options,
333 arguments,
334 )
335 .await?;
336
337 debug!("Declared delayed message exchange: {}", exchange_name);
338 return Ok(());
339 } else {
340 lapin::ExchangeKind::Custom(custom_type.clone())
341 }
342 }
343 other => other.clone(),
344 };
345
346 let exchange_options = LapinExchangeDeclareOptions {
347 passive: options.passive,
348 durable: options.durable,
349 auto_delete: options.auto_delete,
350 internal: options.internal,
351 nowait: false,
352 };
353
354 channel
355 .exchange_declare(
356 exchange_name,
357 exchange_kind,
358 exchange_options,
359 options.arguments.clone(),
360 )
361 .await?;
362
363 debug!(
364 "Declared exchange: {} of type: {:?}",
365 exchange_name, options.exchange_type
366 );
367 Ok(())
368 }
369}
370
371#[derive(Debug, Clone)]
373pub struct PublishOptions {
374 pub persistent: bool,
376
377 pub message_id: Option<String>,
379
380 pub correlation_id: Option<String>,
382
383 pub reply_to: Option<String>,
385
386 pub ttl: Option<Duration>,
388
389 pub priority: Option<u8>,
391
392 pub headers: HashMap<String, lapin::types::AMQPValue>,
394
395 pub auto_declare_queue: bool,
397
398 pub auto_declare_exchange: bool,
400
401 pub queue_options: CustomQueueDeclareOptions,
403
404 pub exchange_options: CustomExchangeDeclareOptions,
406}
407
408impl PublishOptions {
409 pub fn builder() -> PublishOptionsBuilder {
411 PublishOptionsBuilder::new()
412 }
413}
414
415#[derive(Debug, Clone)]
417pub struct PublishOptionsBuilder {
418 persistent: bool,
419 message_id: Option<String>,
420 correlation_id: Option<String>,
421 reply_to: Option<String>,
422 ttl: Option<Duration>,
423 priority: Option<u8>,
424 headers: HashMap<String, lapin::types::AMQPValue>,
425 auto_declare_queue: bool,
426 auto_declare_exchange: bool,
427 queue_options: CustomQueueDeclareOptions,
428 exchange_options: CustomExchangeDeclareOptions,
429}
430
431impl PublishOptionsBuilder {
432 pub fn new() -> Self {
434 Self {
435 persistent: true,
436 message_id: Some(Uuid::new_v4().to_string()),
437 correlation_id: None,
438 reply_to: None,
439 ttl: None,
440 priority: None,
441 headers: HashMap::new(),
442 auto_declare_queue: false,
443 auto_declare_exchange: false,
444 queue_options: CustomQueueDeclareOptions::default(),
445 exchange_options: CustomExchangeDeclareOptions::default(),
446 }
447 }
448
449 pub fn persistent(mut self, persistent: bool) -> Self {
451 self.persistent = persistent;
452 self
453 }
454
455 pub fn durable(mut self) -> Self {
457 self.persistent = true;
458 self
459 }
460
461 pub fn transient(mut self) -> Self {
463 self.persistent = false;
464 self
465 }
466
467 pub fn message_id<S: Into<String>>(mut self, id: S) -> Self {
469 self.message_id = Some(id.into());
470 self
471 }
472
473 pub fn random_message_id(mut self) -> Self {
475 self.message_id = Some(Uuid::new_v4().to_string());
476 self
477 }
478
479 pub fn no_message_id(mut self) -> Self {
481 self.message_id = None;
482 self
483 }
484
485 pub fn correlation_id<S: Into<String>>(mut self, id: S) -> Self {
487 self.correlation_id = Some(id.into());
488 self
489 }
490
491 pub fn reply_to<S: Into<String>>(mut self, queue: S) -> Self {
493 self.reply_to = Some(queue.into());
494 self
495 }
496
497 pub fn ttl(mut self, ttl: Duration) -> Self {
499 self.ttl = Some(ttl);
500 self
501 }
502
503 pub fn priority(mut self, priority: u8) -> Self {
505 self.priority = Some(priority);
506 self
507 }
508
509 pub fn header<S: Into<String>>(mut self, key: S, value: lapin::types::AMQPValue) -> Self {
511 self.headers.insert(key.into(), value);
512 self
513 }
514
515 pub fn header_string<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
517 self.headers.insert(
518 key.into(),
519 lapin::types::AMQPValue::LongString(value.into().into()),
520 );
521 self
522 }
523
524 pub fn header_int<K: Into<String>>(mut self, key: K, value: i64) -> Self {
526 self.headers
527 .insert(key.into(), lapin::types::AMQPValue::LongLongInt(value));
528 self
529 }
530
531 pub fn auto_declare_queue(mut self) -> Self {
533 self.auto_declare_queue = true;
534 self
535 }
536
537 pub fn auto_declare_exchange(mut self) -> Self {
539 self.auto_declare_exchange = true;
540 self
541 }
542
543 pub fn queue_options(mut self, options: CustomQueueDeclareOptions) -> Self {
545 self.queue_options = options;
546 self
547 }
548
549 pub fn exchange_options(mut self, options: CustomExchangeDeclareOptions) -> Self {
551 self.exchange_options = options;
552 self
553 }
554
555 pub fn request_response<S: Into<String>>(mut self, reply_to: S, correlation_id: S) -> Self {
557 self.reply_to = Some(reply_to.into());
558 self.correlation_id = Some(correlation_id.into());
559 self
560 }
561
562 pub fn development(mut self) -> Self {
564 self.auto_declare_queue = true;
565 self.auto_declare_exchange = true;
566 self
567 }
568
569 pub fn production(mut self) -> Self {
571 self.auto_declare_queue = false;
572 self.auto_declare_exchange = false;
573 self.persistent = true;
574 self
575 }
576
577 pub fn build(self) -> PublishOptions {
579 PublishOptions {
580 persistent: self.persistent,
581 message_id: self.message_id,
582 correlation_id: self.correlation_id,
583 reply_to: self.reply_to,
584 ttl: self.ttl,
585 priority: self.priority,
586 headers: self.headers,
587 auto_declare_queue: self.auto_declare_queue,
588 auto_declare_exchange: self.auto_declare_exchange,
589 queue_options: self.queue_options,
590 exchange_options: self.exchange_options,
591 }
592 }
593}
594
595impl Default for PublishOptionsBuilder {
596 fn default() -> Self {
597 Self::new()
598 }
599}
600
601impl Default for PublishOptions {
602 fn default() -> Self {
603 Self {
604 persistent: true,
605 message_id: Some(Uuid::new_v4().to_string()),
606 correlation_id: None,
607 reply_to: None,
608 ttl: None,
609 priority: None,
610 headers: HashMap::new(),
611 auto_declare_queue: false,
612 auto_declare_exchange: false,
613 queue_options: CustomQueueDeclareOptions::default(),
614 exchange_options: CustomExchangeDeclareOptions::default(),
615 }
616 }
617}
618
619#[derive(Debug, Clone)]
621pub struct CustomQueueDeclareOptions {
622 pub passive: bool,
623 pub durable: bool,
624 pub exclusive: bool,
625 pub auto_delete: bool,
626 pub arguments: FieldTable,
627}
628
629impl Default for CustomQueueDeclareOptions {
630 fn default() -> Self {
631 Self {
632 passive: false,
633 durable: true,
634 exclusive: false,
635 auto_delete: false,
636 arguments: FieldTable::default(),
637 }
638 }
639}
640
641#[derive(Debug, Clone)]
643pub struct CustomExchangeDeclareOptions {
644 pub passive: bool,
645 pub durable: bool,
646 pub auto_delete: bool,
647 pub internal: bool,
648 pub exchange_type: ExchangeKind,
649 pub original_type: ExchangeKind, pub arguments: FieldTable,
651}
652
653impl Default for CustomExchangeDeclareOptions {
654 fn default() -> Self {
655 Self {
656 passive: false,
657 durable: true,
658 auto_delete: false,
659 internal: false,
660 exchange_type: ExchangeKind::Direct,
661 original_type: ExchangeKind::Direct,
662 arguments: FieldTable::default(),
663 }
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670
671 #[test]
672 fn test_publish_options_default() {
673 let options = PublishOptions::default();
674 assert!(options.persistent);
675 assert!(options.message_id.is_some());
676 assert!(options.correlation_id.is_none());
677 assert!(options.reply_to.is_none());
678 assert!(options.ttl.is_none());
679 assert!(options.priority.is_none());
680 assert!(options.headers.is_empty());
681 assert!(!options.auto_declare_queue);
682 assert!(!options.auto_declare_exchange);
683 }
684
685 #[test]
686 fn test_queue_declare_options_default() {
687 let options = CustomQueueDeclareOptions::default();
688 assert!(!options.passive);
689 assert!(options.durable);
690 assert!(!options.exclusive);
691 assert!(!options.auto_delete);
692 }
693
694 #[test]
695 fn test_exchange_declare_options_default() {
696 let options = CustomExchangeDeclareOptions::default();
697 assert!(!options.passive);
698 assert!(options.durable);
699 assert!(!options.auto_delete);
700 assert!(!options.internal);
701 assert!(matches!(options.exchange_type, ExchangeKind::Direct));
702 assert!(matches!(options.original_type, ExchangeKind::Direct));
703 }
704}