rust_rabbit/
publisher.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{MessageEnvelope, WireMessage},
5};
6use lapin::{
7    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
8    types::FieldTable,
9    BasicProperties, Channel, ExchangeKind,
10};
11use serde::Serialize;
12use std::sync::Arc;
13use tracing::debug;
14
15/// Publish options builder
16#[derive(Debug, Clone, Default)]
17pub struct PublishOptions {
18    pub mandatory: bool,
19    pub immediate: bool,
20    pub expiration: Option<String>,
21    pub priority: Option<u8>,
22}
23
24impl PublishOptions {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn mandatory(mut self) -> Self {
30        self.mandatory = true;
31        self
32    }
33
34    pub fn priority(mut self, priority: u8) -> Self {
35        self.priority = Some(priority);
36        self
37    }
38
39    pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
40        self.expiration = Some(expiration.into());
41        self
42    }
43
44    pub fn with_priority(mut self, priority: u8) -> Self {
45        self.priority = Some(priority);
46        self
47    }
48}
49
50/// Simplified Publisher for message publishing
51pub struct Publisher {
52    connection: Arc<Connection>,
53}
54
55impl Publisher {
56    /// Create a new publisher
57    pub fn new(connection: Arc<Connection>) -> Self {
58        Self { connection }
59    }
60
61    /// Publish message to an exchange
62    pub async fn publish_to_exchange<T>(
63        &self,
64        exchange: &str,
65        routing_key: &str,
66        message: &T,
67        options: Option<PublishOptions>,
68    ) -> Result<(), RustRabbitError>
69    where
70        T: Serialize,
71    {
72        let channel = self.connection.create_channel().await?;
73
74        // Declare exchange (simplified - always topic for flexibility)
75        channel
76            .exchange_declare(
77                exchange,
78                ExchangeKind::Topic,
79                ExchangeDeclareOptions {
80                    durable: true,
81                    ..Default::default()
82                },
83                FieldTable::default(),
84            )
85            .await?;
86
87        self.publish_message(&channel, exchange, routing_key, message, options)
88            .await
89    }
90
91    /// Publish message directly to a queue
92    pub async fn publish_to_queue<T>(
93        &self,
94        queue: &str,
95        message: &T,
96        options: Option<PublishOptions>,
97    ) -> Result<(), RustRabbitError>
98    where
99        T: Serialize,
100    {
101        let channel = self.connection.create_channel().await?;
102
103        // Declare queue
104        channel
105            .queue_declare(
106                queue,
107                QueueDeclareOptions {
108                    durable: true,
109                    ..Default::default()
110                },
111                FieldTable::default(),
112            )
113            .await?;
114
115        // Publish to default exchange with queue name as routing key
116        self.publish_message(&channel, "", queue, message, options)
117            .await
118    }
119
120    /// Internal method to publish message
121    async fn publish_message<T>(
122        &self,
123        channel: &Channel,
124        exchange: &str,
125        routing_key: &str,
126        message: &T,
127        options: Option<PublishOptions>,
128    ) -> Result<(), RustRabbitError>
129    where
130        T: Serialize,
131    {
132        // Wrap in WireMessage format
133        let wire_message = WireMessage {
134            data: message,
135            retry_attempt: 0,
136        };
137
138        // Serialize message
139        let payload = serde_json::to_vec(&wire_message)
140            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
141
142        // Build properties
143        let options = options.unwrap_or_default();
144        let mut properties = BasicProperties::default()
145            .with_content_type("application/json".into())
146            .with_delivery_mode(2); // Persistent
147
148        if let Some(expiration) = options.expiration {
149            properties = properties.with_expiration(expiration.into());
150        }
151
152        if let Some(priority) = options.priority {
153            properties = properties.with_priority(priority);
154        }
155
156        // Publish message
157        let confirm = channel
158            .basic_publish(
159                exchange,
160                routing_key,
161                BasicPublishOptions {
162                    mandatory: options.mandatory,
163                    immediate: options.immediate,
164                },
165                &payload,
166                properties,
167            )
168            .await?;
169
170        // Wait for confirmation (simplified)
171        confirm.await?;
172
173        debug!(
174            "Published message to exchange '{}' with routing key '{}'",
175            exchange, routing_key
176        );
177
178        Ok(())
179    }
180
181    /// Publish a message envelope to an exchange (includes retry metadata)
182    pub async fn publish_envelope_to_exchange<T>(
183        &self,
184        exchange: &str,
185        routing_key: &str,
186        envelope: &MessageEnvelope<T>,
187        options: Option<PublishOptions>,
188    ) -> Result<(), RustRabbitError>
189    where
190        T: Serialize,
191    {
192        self.publish_to_exchange(exchange, routing_key, envelope, options)
193            .await
194    }
195
196    /// Publish a message envelope directly to a queue (includes retry metadata)
197    pub async fn publish_envelope_to_queue<T>(
198        &self,
199        queue: &str,
200        envelope: &MessageEnvelope<T>,
201        options: Option<PublishOptions>,
202    ) -> Result<(), RustRabbitError>
203    where
204        T: Serialize,
205    {
206        self.publish_to_queue(queue, envelope, options).await
207    }
208
209    /// Create a message envelope with source tracking and publish to exchange
210    pub async fn publish_with_envelope<T>(
211        &self,
212        exchange: &str,
213        routing_key: &str,
214        payload: &T,
215        source_queue: &str,
216        max_retries: u32,
217        options: Option<PublishOptions>,
218    ) -> Result<(), RustRabbitError>
219    where
220        T: Serialize + Clone,
221    {
222        let envelope = MessageEnvelope::with_source(
223            payload.clone(),
224            source_queue,
225            Some(exchange),
226            Some(routing_key),
227            Some("rust-rabbit-publisher"), // Publisher identifier
228        )
229        .with_max_retries(max_retries);
230
231        self.publish_envelope_to_exchange(exchange, routing_key, &envelope, options)
232            .await
233    }
234
235    /// Create a message envelope and publish directly to queue
236    pub async fn publish_with_envelope_to_queue<T>(
237        &self,
238        queue: &str,
239        payload: &T,
240        max_retries: u32,
241        options: Option<PublishOptions>,
242    ) -> Result<(), RustRabbitError>
243    where
244        T: Serialize + Clone,
245    {
246        let envelope = MessageEnvelope::new(payload.clone(), queue).with_max_retries(max_retries);
247
248        self.publish_envelope_to_queue(queue, &envelope, options)
249            .await
250    }
251}