1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{MessageEnvelope, WireMessage},
5};
6use lapin::{
7 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
8 types::FieldTable,
9 BasicProperties, Channel, ExchangeKind,
10};
11use serde::Serialize;
12use std::sync::Arc;
13use tracing::debug;
14
15#[derive(Debug, Clone, Default)]
17pub struct PublishOptions {
18 pub mandatory: bool,
19 pub immediate: bool,
20 pub expiration: Option<String>,
21 pub priority: Option<u8>,
22}
23
24impl PublishOptions {
25 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn mandatory(mut self) -> Self {
30 self.mandatory = true;
31 self
32 }
33
34 pub fn priority(mut self, priority: u8) -> Self {
35 self.priority = Some(priority);
36 self
37 }
38
39 pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
40 self.expiration = Some(expiration.into());
41 self
42 }
43
44 pub fn with_priority(mut self, priority: u8) -> Self {
45 self.priority = Some(priority);
46 self
47 }
48}
49
50pub struct Publisher {
52 connection: Arc<Connection>,
53}
54
55impl Publisher {
56 pub fn new(connection: Arc<Connection>) -> Self {
58 Self { connection }
59 }
60
61 pub async fn publish_to_exchange<T>(
63 &self,
64 exchange: &str,
65 routing_key: &str,
66 message: &T,
67 options: Option<PublishOptions>,
68 ) -> Result<(), RustRabbitError>
69 where
70 T: Serialize,
71 {
72 let channel = self.connection.create_channel().await?;
73
74 channel
76 .exchange_declare(
77 exchange,
78 ExchangeKind::Topic,
79 ExchangeDeclareOptions {
80 durable: true,
81 ..Default::default()
82 },
83 FieldTable::default(),
84 )
85 .await?;
86
87 self.publish_message(&channel, exchange, routing_key, message, options)
88 .await
89 }
90
91 pub async fn publish_to_queue<T>(
93 &self,
94 queue: &str,
95 message: &T,
96 options: Option<PublishOptions>,
97 ) -> Result<(), RustRabbitError>
98 where
99 T: Serialize,
100 {
101 let channel = self.connection.create_channel().await?;
102
103 channel
105 .queue_declare(
106 queue,
107 QueueDeclareOptions {
108 durable: true,
109 ..Default::default()
110 },
111 FieldTable::default(),
112 )
113 .await?;
114
115 self.publish_message(&channel, "", queue, message, options)
117 .await
118 }
119
120 async fn publish_message<T>(
122 &self,
123 channel: &Channel,
124 exchange: &str,
125 routing_key: &str,
126 message: &T,
127 options: Option<PublishOptions>,
128 ) -> Result<(), RustRabbitError>
129 where
130 T: Serialize,
131 {
132 let wire_message = WireMessage {
134 data: message,
135 retry_attempt: 0,
136 };
137
138 let payload = serde_json::to_vec(&wire_message)
140 .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
141
142 let options = options.unwrap_or_default();
144 let mut properties = BasicProperties::default()
145 .with_content_type("application/json".into())
146 .with_delivery_mode(2); if let Some(expiration) = options.expiration {
149 properties = properties.with_expiration(expiration.into());
150 }
151
152 if let Some(priority) = options.priority {
153 properties = properties.with_priority(priority);
154 }
155
156 let confirm = channel
158 .basic_publish(
159 exchange,
160 routing_key,
161 BasicPublishOptions {
162 mandatory: options.mandatory,
163 immediate: options.immediate,
164 },
165 &payload,
166 properties,
167 )
168 .await?;
169
170 confirm.await?;
172
173 debug!(
174 "Published message to exchange '{}' with routing key '{}'",
175 exchange, routing_key
176 );
177
178 Ok(())
179 }
180
181 pub async fn publish_envelope_to_exchange<T>(
183 &self,
184 exchange: &str,
185 routing_key: &str,
186 envelope: &MessageEnvelope<T>,
187 options: Option<PublishOptions>,
188 ) -> Result<(), RustRabbitError>
189 where
190 T: Serialize,
191 {
192 self.publish_to_exchange(exchange, routing_key, envelope, options)
193 .await
194 }
195
196 pub async fn publish_envelope_to_queue<T>(
198 &self,
199 queue: &str,
200 envelope: &MessageEnvelope<T>,
201 options: Option<PublishOptions>,
202 ) -> Result<(), RustRabbitError>
203 where
204 T: Serialize,
205 {
206 self.publish_to_queue(queue, envelope, options).await
207 }
208
209 pub async fn publish_with_envelope<T>(
211 &self,
212 exchange: &str,
213 routing_key: &str,
214 payload: &T,
215 source_queue: &str,
216 max_retries: u32,
217 options: Option<PublishOptions>,
218 ) -> Result<(), RustRabbitError>
219 where
220 T: Serialize + Clone,
221 {
222 let envelope = MessageEnvelope::with_source(
223 payload.clone(),
224 source_queue,
225 Some(exchange),
226 Some(routing_key),
227 Some("rust-rabbit-publisher"), )
229 .with_max_retries(max_retries);
230
231 self.publish_envelope_to_exchange(exchange, routing_key, &envelope, options)
232 .await
233 }
234
235 pub async fn publish_with_envelope_to_queue<T>(
237 &self,
238 queue: &str,
239 payload: &T,
240 max_retries: u32,
241 options: Option<PublishOptions>,
242 ) -> Result<(), RustRabbitError>
243 where
244 T: Serialize + Clone,
245 {
246 let envelope = MessageEnvelope::new(payload.clone(), queue).with_max_retries(max_retries);
247
248 self.publish_envelope_to_queue(queue, &envelope, options)
249 .await
250 }
251}