1use crate::{connection::Connection, error::RustRabbitError, message::MessageEnvelope};
2use lapin::{
3 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
4 types::FieldTable,
5 BasicProperties, Channel, ExchangeKind,
6};
7use serde::Serialize;
8use std::sync::Arc;
9use tracing::debug;
10
11#[derive(Debug, Clone, Default)]
13pub struct PublishOptions {
14 pub mandatory: bool,
15 pub immediate: bool,
16 pub expiration: Option<String>,
17 pub priority: Option<u8>,
18}
19
20impl PublishOptions {
21 pub fn new() -> Self {
22 Self::default()
23 }
24
25 pub fn mandatory(mut self) -> Self {
26 self.mandatory = true;
27 self
28 }
29
30 pub fn priority(mut self, priority: u8) -> Self {
31 self.priority = Some(priority);
32 self
33 }
34
35 pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
36 self.expiration = Some(expiration.into());
37 self
38 }
39
40 pub fn with_priority(mut self, priority: u8) -> Self {
41 self.priority = Some(priority);
42 self
43 }
44}
45
46pub struct Publisher {
48 connection: Arc<Connection>,
49}
50
51impl Publisher {
52 pub fn new(connection: Arc<Connection>) -> Self {
54 Self { connection }
55 }
56
57 pub async fn publish_to_exchange<T>(
59 &self,
60 exchange: &str,
61 routing_key: &str,
62 message: &T,
63 options: Option<PublishOptions>,
64 ) -> Result<(), RustRabbitError>
65 where
66 T: Serialize,
67 {
68 let channel = self.connection.create_channel().await?;
69
70 channel
72 .exchange_declare(
73 exchange,
74 ExchangeKind::Topic,
75 ExchangeDeclareOptions {
76 durable: true,
77 ..Default::default()
78 },
79 FieldTable::default(),
80 )
81 .await?;
82
83 self.publish_message(&channel, exchange, routing_key, message, options)
84 .await
85 }
86
87 pub async fn publish_to_queue<T>(
89 &self,
90 queue: &str,
91 message: &T,
92 options: Option<PublishOptions>,
93 ) -> Result<(), RustRabbitError>
94 where
95 T: Serialize,
96 {
97 let channel = self.connection.create_channel().await?;
98
99 channel
101 .queue_declare(
102 queue,
103 QueueDeclareOptions {
104 durable: true,
105 ..Default::default()
106 },
107 FieldTable::default(),
108 )
109 .await?;
110
111 self.publish_message(&channel, "", queue, message, options)
113 .await
114 }
115
116 async fn publish_message<T>(
118 &self,
119 channel: &Channel,
120 exchange: &str,
121 routing_key: &str,
122 message: &T,
123 options: Option<PublishOptions>,
124 ) -> Result<(), RustRabbitError>
125 where
126 T: Serialize,
127 {
128 let payload = serde_json::to_vec(message)
130 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
131
132 let options = options.unwrap_or_default();
134 let mut properties = BasicProperties::default()
135 .with_content_type("application/json".into())
136 .with_delivery_mode(2); if let Some(expiration) = options.expiration {
139 properties = properties.with_expiration(expiration.into());
140 }
141
142 if let Some(priority) = options.priority {
143 properties = properties.with_priority(priority);
144 }
145
146 let confirm = channel
148 .basic_publish(
149 exchange,
150 routing_key,
151 BasicPublishOptions {
152 mandatory: options.mandatory,
153 immediate: options.immediate,
154 },
155 &payload,
156 properties,
157 )
158 .await?;
159
160 confirm.await?;
162
163 debug!(
164 "Published message to exchange '{}' with routing key '{}'",
165 exchange, routing_key
166 );
167
168 Ok(())
169 }
170
171 pub async fn publish_envelope_to_exchange<T>(
173 &self,
174 exchange: &str,
175 routing_key: &str,
176 envelope: &MessageEnvelope<T>,
177 options: Option<PublishOptions>,
178 ) -> Result<(), RustRabbitError>
179 where
180 T: Serialize,
181 {
182 self.publish_to_exchange(exchange, routing_key, envelope, options)
183 .await
184 }
185
186 pub async fn publish_envelope_to_queue<T>(
188 &self,
189 queue: &str,
190 envelope: &MessageEnvelope<T>,
191 options: Option<PublishOptions>,
192 ) -> Result<(), RustRabbitError>
193 where
194 T: Serialize,
195 {
196 self.publish_to_queue(queue, envelope, options).await
197 }
198
199 pub async fn publish_with_envelope<T>(
201 &self,
202 exchange: &str,
203 routing_key: &str,
204 payload: &T,
205 source_queue: &str,
206 max_retries: u32,
207 options: Option<PublishOptions>,
208 ) -> Result<(), RustRabbitError>
209 where
210 T: Serialize + Clone,
211 {
212 let envelope = MessageEnvelope::with_source(
213 payload.clone(),
214 source_queue,
215 Some(exchange),
216 Some(routing_key),
217 Some("rust-rabbit-publisher"), )
219 .with_max_retries(max_retries);
220
221 self.publish_envelope_to_exchange(exchange, routing_key, &envelope, options)
222 .await
223 }
224
225 pub async fn publish_with_envelope_to_queue<T>(
227 &self,
228 queue: &str,
229 payload: &T,
230 max_retries: u32,
231 options: Option<PublishOptions>,
232 ) -> Result<(), RustRabbitError>
233 where
234 T: Serialize + Clone,
235 {
236 let envelope = MessageEnvelope::new(payload.clone(), queue).with_max_retries(max_retries);
237
238 self.publish_envelope_to_queue(queue, &envelope, options)
239 .await
240 }
241}