rust_rabbit/
publisher.rs

1use crate::{connection::Connection, error::RustRabbitError, message::MessageEnvelope};
2use lapin::{
3    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
4    types::FieldTable,
5    BasicProperties, Channel, ExchangeKind,
6};
7use serde::Serialize;
8use std::sync::Arc;
9use tracing::debug;
10
11/// Publish options builder
12#[derive(Debug, Clone, Default)]
13pub struct PublishOptions {
14    pub mandatory: bool,
15    pub immediate: bool,
16    pub expiration: Option<String>,
17    pub priority: Option<u8>,
18}
19
20impl PublishOptions {
21    pub fn new() -> Self {
22        Self::default()
23    }
24
25    pub fn mandatory(mut self) -> Self {
26        self.mandatory = true;
27        self
28    }
29
30    pub fn priority(mut self, priority: u8) -> Self {
31        self.priority = Some(priority);
32        self
33    }
34
35    pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
36        self.expiration = Some(expiration.into());
37        self
38    }
39
40    pub fn with_priority(mut self, priority: u8) -> Self {
41        self.priority = Some(priority);
42        self
43    }
44}
45
46/// Simplified Publisher for message publishing
47pub struct Publisher {
48    connection: Arc<Connection>,
49}
50
51impl Publisher {
52    /// Create a new publisher
53    pub fn new(connection: Arc<Connection>) -> Self {
54        Self { connection }
55    }
56
57    /// Publish message to an exchange
58    pub async fn publish_to_exchange<T>(
59        &self,
60        exchange: &str,
61        routing_key: &str,
62        message: &T,
63        options: Option<PublishOptions>,
64    ) -> Result<(), RustRabbitError>
65    where
66        T: Serialize,
67    {
68        let channel = self.connection.create_channel().await?;
69
70        // Declare exchange (simplified - always topic for flexibility)
71        channel
72            .exchange_declare(
73                exchange,
74                ExchangeKind::Topic,
75                ExchangeDeclareOptions {
76                    durable: true,
77                    ..Default::default()
78                },
79                FieldTable::default(),
80            )
81            .await?;
82
83        self.publish_message(&channel, exchange, routing_key, message, options)
84            .await
85    }
86
87    /// Publish message directly to a queue
88    pub async fn publish_to_queue<T>(
89        &self,
90        queue: &str,
91        message: &T,
92        options: Option<PublishOptions>,
93    ) -> Result<(), RustRabbitError>
94    where
95        T: Serialize,
96    {
97        let channel = self.connection.create_channel().await?;
98
99        // Declare queue
100        channel
101            .queue_declare(
102                queue,
103                QueueDeclareOptions {
104                    durable: true,
105                    ..Default::default()
106                },
107                FieldTable::default(),
108            )
109            .await?;
110
111        // Publish to default exchange with queue name as routing key
112        self.publish_message(&channel, "", queue, message, options)
113            .await
114    }
115
116    /// Internal method to publish message
117    async fn publish_message<T>(
118        &self,
119        channel: &Channel,
120        exchange: &str,
121        routing_key: &str,
122        message: &T,
123        options: Option<PublishOptions>,
124    ) -> Result<(), RustRabbitError>
125    where
126        T: Serialize,
127    {
128        // Serialize message
129        let payload = serde_json::to_vec(message)
130            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
131
132        // Build properties
133        let options = options.unwrap_or_default();
134        let mut properties = BasicProperties::default()
135            .with_content_type("application/json".into())
136            .with_delivery_mode(2); // Persistent
137
138        if let Some(expiration) = options.expiration {
139            properties = properties.with_expiration(expiration.into());
140        }
141
142        if let Some(priority) = options.priority {
143            properties = properties.with_priority(priority);
144        }
145
146        // Publish message
147        let confirm = channel
148            .basic_publish(
149                exchange,
150                routing_key,
151                BasicPublishOptions {
152                    mandatory: options.mandatory,
153                    immediate: options.immediate,
154                },
155                &payload,
156                properties,
157            )
158            .await?;
159
160        // Wait for confirmation (simplified)
161        confirm.await?;
162
163        debug!(
164            "Published message to exchange '{}' with routing key '{}'",
165            exchange, routing_key
166        );
167
168        Ok(())
169    }
170
171    /// Publish a message envelope to an exchange (includes retry metadata)
172    pub async fn publish_envelope_to_exchange<T>(
173        &self,
174        exchange: &str,
175        routing_key: &str,
176        envelope: &MessageEnvelope<T>,
177        options: Option<PublishOptions>,
178    ) -> Result<(), RustRabbitError>
179    where
180        T: Serialize,
181    {
182        self.publish_to_exchange(exchange, routing_key, envelope, options)
183            .await
184    }
185
186    /// Publish a message envelope directly to a queue (includes retry metadata)
187    pub async fn publish_envelope_to_queue<T>(
188        &self,
189        queue: &str,
190        envelope: &MessageEnvelope<T>,
191        options: Option<PublishOptions>,
192    ) -> Result<(), RustRabbitError>
193    where
194        T: Serialize,
195    {
196        self.publish_to_queue(queue, envelope, options).await
197    }
198
199    /// Create a message envelope with source tracking and publish to exchange
200    pub async fn publish_with_envelope<T>(
201        &self,
202        exchange: &str,
203        routing_key: &str,
204        payload: &T,
205        source_queue: &str,
206        max_retries: u32,
207        options: Option<PublishOptions>,
208    ) -> Result<(), RustRabbitError>
209    where
210        T: Serialize + Clone,
211    {
212        let envelope = MessageEnvelope::with_source(
213            payload.clone(),
214            source_queue,
215            Some(exchange),
216            Some(routing_key),
217            Some("rust-rabbit-publisher"), // Publisher identifier
218        )
219        .with_max_retries(max_retries);
220
221        self.publish_envelope_to_exchange(exchange, routing_key, &envelope, options)
222            .await
223    }
224
225    /// Create a message envelope and publish directly to queue
226    pub async fn publish_with_envelope_to_queue<T>(
227        &self,
228        queue: &str,
229        payload: &T,
230        max_retries: u32,
231        options: Option<PublishOptions>,
232    ) -> Result<(), RustRabbitError>
233    where
234        T: Serialize + Clone,
235    {
236        let envelope = MessageEnvelope::new(payload.clone(), queue).with_max_retries(max_retries);
237
238        self.publish_envelope_to_queue(queue, &envelope, options)
239            .await
240    }
241}