1use crate::{connection::Connection, error::RustRabbitError};
2use lapin::{
3 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
4 types::FieldTable,
5 BasicProperties, Channel, ExchangeKind,
6};
7use serde::Serialize;
8use std::sync::Arc;
9use tracing::debug;
10
11#[derive(Debug, Clone, Default)]
13pub struct PublishOptions {
14 pub mandatory: bool,
15 pub immediate: bool,
16 pub expiration: Option<String>,
17 pub priority: Option<u8>,
18}
19
20impl PublishOptions {
21 pub fn new() -> Self {
22 Self::default()
23 }
24
25 pub fn mandatory(mut self) -> Self {
26 self.mandatory = true;
27 self
28 }
29
30 pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
31 self.expiration = Some(expiration.into());
32 self
33 }
34
35 pub fn with_priority(mut self, priority: u8) -> Self {
36 self.priority = Some(priority);
37 self
38 }
39}
40
41pub struct Publisher {
43 connection: Arc<Connection>,
44}
45
46impl Publisher {
47 pub fn new(connection: Arc<Connection>) -> Self {
49 Self { connection }
50 }
51
52 pub async fn publish_to_exchange<T>(
54 &self,
55 exchange: &str,
56 routing_key: &str,
57 message: &T,
58 options: Option<PublishOptions>,
59 ) -> Result<(), RustRabbitError>
60 where
61 T: Serialize,
62 {
63 let channel = self.connection.create_channel().await?;
64
65 channel
67 .exchange_declare(
68 exchange,
69 ExchangeKind::Topic,
70 ExchangeDeclareOptions {
71 durable: true,
72 ..Default::default()
73 },
74 FieldTable::default(),
75 )
76 .await?;
77
78 self.publish_message(&channel, exchange, routing_key, message, options)
79 .await
80 }
81
82 pub async fn publish_to_queue<T>(
84 &self,
85 queue: &str,
86 message: &T,
87 options: Option<PublishOptions>,
88 ) -> Result<(), RustRabbitError>
89 where
90 T: Serialize,
91 {
92 let channel = self.connection.create_channel().await?;
93
94 channel
96 .queue_declare(
97 queue,
98 QueueDeclareOptions {
99 durable: true,
100 ..Default::default()
101 },
102 FieldTable::default(),
103 )
104 .await?;
105
106 self.publish_message(&channel, "", queue, message, options)
108 .await
109 }
110
111 async fn publish_message<T>(
113 &self,
114 channel: &Channel,
115 exchange: &str,
116 routing_key: &str,
117 message: &T,
118 options: Option<PublishOptions>,
119 ) -> Result<(), RustRabbitError>
120 where
121 T: Serialize,
122 {
123 let payload = serde_json::to_vec(message)
125 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
126
127 let options = options.unwrap_or_default();
129 let mut properties = BasicProperties::default()
130 .with_content_type("application/json".into())
131 .with_delivery_mode(2); if let Some(expiration) = options.expiration {
134 properties = properties.with_expiration(expiration.into());
135 }
136
137 if let Some(priority) = options.priority {
138 properties = properties.with_priority(priority);
139 }
140
141 let confirm = channel
143 .basic_publish(
144 exchange,
145 routing_key,
146 BasicPublishOptions {
147 mandatory: options.mandatory,
148 immediate: options.immediate,
149 },
150 &payload,
151 properties,
152 )
153 .await?;
154
155 confirm.await?;
157
158 debug!(
159 "Published message to exchange '{}' with routing key '{}'",
160 exchange, routing_key
161 );
162
163 Ok(())
164 }
165}