rust_rabbit/
publisher.rs

1use crate::{
2    connection::ConnectionManager,
3    error::{RabbitError, Result},
4    metrics::{MetricsTimer, RustRabbitMetrics},
5};
6use lapin::{
7    options::{
8        BasicPublishOptions, ExchangeDeclareOptions as LapinExchangeDeclareOptions,
9        QueueDeclareOptions as LapinQueueDeclareOptions,
10    },
11    types::FieldTable,
12    BasicProperties, Channel, ExchangeKind,
13};
14use serde::Serialize;
15use std::{collections::HashMap, time::Duration};
16use tracing::debug;
17use uuid::Uuid;
18
19/// Publisher for sending messages to RabbitMQ
20#[derive(Debug, Clone)]
21pub struct Publisher {
22    connection_manager: ConnectionManager,
23    metrics: Option<RustRabbitMetrics>,
24}
25
26impl Publisher {
27    /// Create a new publisher
28    pub fn new(connection_manager: ConnectionManager) -> Self {
29        Self {
30            connection_manager,
31            metrics: None,
32        }
33    }
34
35    /// Set metrics for this publisher
36    pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
37        self.metrics = Some(metrics);
38    }
39
40    /// Publish a message to a queue
41    pub async fn publish_to_queue<T>(
42        &self,
43        queue_name: &str,
44        message: &T,
45        options: Option<PublishOptions>,
46    ) -> Result<()>
47    where
48        T: Serialize,
49    {
50        let timer = MetricsTimer::new();
51        let channel = self.get_channel().await?;
52
53        // Declare queue if auto_declare is enabled
54        let options = options.unwrap_or_default();
55        if options.auto_declare_queue {
56            self.declare_queue(&channel, queue_name, &options.queue_options)
57                .await?;
58        }
59
60        let payload = self.serialize_message(message)?;
61        let properties = self.build_basic_properties(&options)?;
62
63        let result = channel
64            .basic_publish(
65                "", // default exchange
66                queue_name,
67                BasicPublishOptions::default(),
68                &payload,
69                properties,
70            )
71            .await;
72
73        // Record metrics
74        if let Some(metrics) = &self.metrics {
75            match &result {
76                Ok(_) => {
77                    metrics.record_message_published(queue_name, "", queue_name);
78                    metrics.record_publish_duration(queue_name, "", timer.elapsed());
79                }
80                Err(_) => {
81                    // Error will be handled by the caller, but we can still record timing
82                    metrics.record_publish_duration(queue_name, "", timer.elapsed());
83                }
84            }
85        }
86
87        result?;
88        debug!("Published message to queue: {}", queue_name);
89        Ok(())
90    }
91
92    /// Publish a message to an exchange
93    pub async fn publish_to_exchange<T>(
94        &self,
95        exchange_name: &str,
96        routing_key: &str,
97        message: &T,
98        options: Option<PublishOptions>,
99    ) -> Result<()>
100    where
101        T: Serialize,
102    {
103        let timer = MetricsTimer::new();
104        let channel = self.get_channel().await?;
105
106        // Declare exchange if auto_declare is enabled
107        let options = options.unwrap_or_default();
108        if options.auto_declare_exchange {
109            self.declare_exchange(&channel, exchange_name, &options.exchange_options)
110                .await?;
111        }
112
113        let payload = self.serialize_message(message)?;
114        let properties = self.build_basic_properties(&options)?;
115
116        let result = channel
117            .basic_publish(
118                exchange_name,
119                routing_key,
120                BasicPublishOptions::default(),
121                &payload,
122                properties,
123            )
124            .await;
125
126        // Record metrics
127        if let Some(metrics) = &self.metrics {
128            match &result {
129                Ok(_) => {
130                    metrics.record_message_published("", exchange_name, routing_key);
131                    metrics.record_publish_duration("", exchange_name, timer.elapsed());
132                }
133                Err(_) => {
134                    metrics.record_publish_duration("", exchange_name, timer.elapsed());
135                }
136            }
137        }
138
139        result?;
140        debug!(
141            "Published message to exchange: {} with routing key: {}",
142            exchange_name, routing_key
143        );
144        Ok(())
145    }
146
147    /// Publish a delayed message using the delayed message exchange plugin
148    pub async fn publish_delayed<T>(
149        &self,
150        exchange_name: &str,
151        routing_key: &str,
152        message: &T,
153        delay: Duration,
154        options: Option<PublishOptions>,
155    ) -> Result<()>
156    where
157        T: Serialize,
158    {
159        let channel = self.get_channel().await?;
160
161        let options = options.unwrap_or_default();
162
163        // Ensure the exchange is declared as delayed type
164        if options.auto_declare_exchange {
165            let mut exchange_opts = options.exchange_options.clone();
166            exchange_opts.exchange_type = ExchangeKind::Custom("x-delayed-message".to_string());
167
168            // We'll handle arguments in the declare_exchange method
169            self.declare_exchange(&channel, exchange_name, &exchange_opts)
170                .await?;
171        }
172
173        let payload = self.serialize_message(message)?;
174        let mut properties = self.build_basic_properties(&options)?;
175
176        // Add delay header
177        let mut headers = properties.headers().clone().unwrap_or_default();
178        headers.insert(
179            "x-delay".into(),
180            lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
181        );
182        properties = properties.with_headers(headers);
183
184        channel
185            .basic_publish(
186                exchange_name,
187                routing_key,
188                BasicPublishOptions::default(),
189                &payload,
190                properties,
191            )
192            .await?;
193
194        debug!(
195            "Published delayed message to exchange: {} with delay: {:?}",
196            exchange_name, delay
197        );
198        Ok(())
199    }
200
201    /// Publish a message with TTL (Time To Live)
202    pub async fn publish_with_ttl<T>(
203        &self,
204        exchange_name: &str,
205        routing_key: &str,
206        message: &T,
207        ttl: Duration,
208        options: Option<PublishOptions>,
209    ) -> Result<()>
210    where
211        T: Serialize,
212    {
213        let mut options = options.unwrap_or_default();
214        options.ttl = Some(ttl);
215
216        self.publish_to_exchange(exchange_name, routing_key, message, Some(options))
217            .await
218    }
219
220    /// Get a channel from the connection manager
221    async fn get_channel(&self) -> Result<Channel> {
222        let connection = self.connection_manager.get_connection().await?;
223        connection.create_channel().await
224    }
225
226    /// Get connection from the connection manager (public method for consumer)
227    pub async fn get_connection(&self) -> Result<std::sync::Arc<crate::connection::Connection>> {
228        self.connection_manager.get_connection().await
229    }
230
231    /// Serialize message to bytes
232    fn serialize_message<T>(&self, message: &T) -> Result<Vec<u8>>
233    where
234        T: Serialize,
235    {
236        serde_json::to_vec(message).map_err(RabbitError::Serialization)
237    }
238
239    /// Build BasicProperties from PublishOptions
240    fn build_basic_properties(&self, options: &PublishOptions) -> Result<BasicProperties> {
241        let mut properties = BasicProperties::default()
242            .with_content_type("application/json".into())
243            .with_delivery_mode(if options.persistent { 2 } else { 1 });
244
245        if let Some(message_id) = &options.message_id {
246            properties = properties.with_message_id(message_id.clone().into());
247        }
248
249        if let Some(correlation_id) = &options.correlation_id {
250            properties = properties.with_correlation_id(correlation_id.clone().into());
251        }
252
253        if let Some(reply_to) = &options.reply_to {
254            properties = properties.with_reply_to(reply_to.clone().into());
255        }
256
257        if let Some(ttl) = options.ttl {
258            properties = properties.with_expiration(ttl.as_millis().to_string().into());
259        }
260
261        if let Some(priority) = options.priority {
262            properties = properties.with_priority(priority);
263        }
264
265        if !options.headers.is_empty() {
266            let mut field_table = FieldTable::default();
267            for (key, value) in &options.headers {
268                field_table.insert(key.clone().into(), value.clone());
269            }
270            properties = properties.with_headers(field_table);
271        }
272
273        properties = properties.with_timestamp(chrono::Utc::now().timestamp() as u64);
274
275        Ok(properties)
276    }
277
278    /// Declare a queue
279    async fn declare_queue(
280        &self,
281        channel: &Channel,
282        queue_name: &str,
283        options: &CustomQueueDeclareOptions,
284    ) -> Result<()> {
285        let queue_options = LapinQueueDeclareOptions {
286            passive: options.passive,
287            durable: options.durable,
288            exclusive: options.exclusive,
289            auto_delete: options.auto_delete,
290            nowait: false,
291        };
292
293        channel
294            .queue_declare(queue_name, queue_options, options.arguments.clone())
295            .await?;
296
297        debug!("Declared queue: {}", queue_name);
298        Ok(())
299    }
300
301    /// Declare an exchange
302    async fn declare_exchange(
303        &self,
304        channel: &Channel,
305        exchange_name: &str,
306        options: &CustomExchangeDeclareOptions,
307    ) -> Result<()> {
308        let exchange_kind = match &options.exchange_type {
309            ExchangeKind::Custom(custom_type) => {
310                if custom_type == "x-delayed-message" {
311                    // For delayed message exchange, we need special handling
312                    let mut arguments = options.arguments.clone();
313                    let original_type_str = match &options.original_type {
314                        ExchangeKind::Direct => "direct",
315                        ExchangeKind::Fanout => "fanout",
316                        ExchangeKind::Topic => "topic",
317                        ExchangeKind::Headers => "headers",
318                        ExchangeKind::Custom(custom) => custom,
319                    };
320                    arguments.insert(
321                        "x-delayed-type".into(),
322                        lapin::types::AMQPValue::LongString(original_type_str.into()),
323                    );
324
325                    let exchange_options = LapinExchangeDeclareOptions {
326                        passive: options.passive,
327                        durable: options.durable,
328                        auto_delete: options.auto_delete,
329                        internal: options.internal,
330                        nowait: false,
331                    };
332
333                    channel
334                        .exchange_declare(
335                            exchange_name,
336                            ExchangeKind::Custom("x-delayed-message".to_string()),
337                            exchange_options,
338                            arguments,
339                        )
340                        .await?;
341
342                    debug!("Declared delayed message exchange: {}", exchange_name);
343                    return Ok(());
344                } else {
345                    lapin::ExchangeKind::Custom(custom_type.clone())
346                }
347            }
348            other => other.clone(),
349        };
350
351        let exchange_options = LapinExchangeDeclareOptions {
352            passive: options.passive,
353            durable: options.durable,
354            auto_delete: options.auto_delete,
355            internal: options.internal,
356            nowait: false,
357        };
358
359        channel
360            .exchange_declare(
361                exchange_name,
362                exchange_kind,
363                exchange_options,
364                options.arguments.clone(),
365            )
366            .await?;
367
368        debug!(
369            "Declared exchange: {} of type: {:?}",
370            exchange_name, options.exchange_type
371        );
372        Ok(())
373    }
374}
375
376/// Options for publishing messages
377#[derive(Debug, Clone)]
378pub struct PublishOptions {
379    /// Whether the message should be persistent
380    pub persistent: bool,
381
382    /// Message ID
383    pub message_id: Option<String>,
384
385    /// Correlation ID for request-response patterns
386    pub correlation_id: Option<String>,
387
388    /// Reply-to queue for RPC patterns
389    pub reply_to: Option<String>,
390
391    /// Message Time To Live
392    pub ttl: Option<Duration>,
393
394    /// Message priority (0-255)
395    pub priority: Option<u8>,
396
397    /// Custom headers
398    pub headers: HashMap<String, lapin::types::AMQPValue>,
399
400    /// Auto-declare queue before publishing
401    pub auto_declare_queue: bool,
402
403    /// Auto-declare exchange before publishing
404    pub auto_declare_exchange: bool,
405
406    /// Queue declaration options
407    pub queue_options: CustomQueueDeclareOptions,
408
409    /// Exchange declaration options
410    pub exchange_options: CustomExchangeDeclareOptions,
411}
412
413impl PublishOptions {
414    /// Create a new publish options builder
415    pub fn builder() -> PublishOptionsBuilder {
416        PublishOptionsBuilder::new()
417    }
418}
419
420/// Builder for PublishOptions
421#[derive(Debug, Clone)]
422pub struct PublishOptionsBuilder {
423    persistent: bool,
424    message_id: Option<String>,
425    correlation_id: Option<String>,
426    reply_to: Option<String>,
427    ttl: Option<Duration>,
428    priority: Option<u8>,
429    headers: HashMap<String, lapin::types::AMQPValue>,
430    auto_declare_queue: bool,
431    auto_declare_exchange: bool,
432    queue_options: CustomQueueDeclareOptions,
433    exchange_options: CustomExchangeDeclareOptions,
434}
435
436impl PublishOptionsBuilder {
437    /// Create a new builder with default values
438    pub fn new() -> Self {
439        Self {
440            persistent: true,
441            message_id: Some(Uuid::new_v4().to_string()),
442            correlation_id: None,
443            reply_to: None,
444            ttl: None,
445            priority: None,
446            headers: HashMap::new(),
447            auto_declare_queue: false,
448            auto_declare_exchange: false,
449            queue_options: CustomQueueDeclareOptions::default(),
450            exchange_options: CustomExchangeDeclareOptions::default(),
451        }
452    }
453
454    /// Set message persistence
455    pub fn persistent(mut self, persistent: bool) -> Self {
456        self.persistent = persistent;
457        self
458    }
459
460    /// Make message persistent
461    pub fn durable(mut self) -> Self {
462        self.persistent = true;
463        self
464    }
465
466    /// Make message non-persistent
467    pub fn transient(mut self) -> Self {
468        self.persistent = false;
469        self
470    }
471
472    /// Set message ID
473    pub fn message_id<S: Into<String>>(mut self, id: S) -> Self {
474        self.message_id = Some(id.into());
475        self
476    }
477
478    /// Generate random message ID
479    pub fn random_message_id(mut self) -> Self {
480        self.message_id = Some(Uuid::new_v4().to_string());
481        self
482    }
483
484    /// Clear message ID
485    pub fn no_message_id(mut self) -> Self {
486        self.message_id = None;
487        self
488    }
489
490    /// Set correlation ID
491    pub fn correlation_id<S: Into<String>>(mut self, id: S) -> Self {
492        self.correlation_id = Some(id.into());
493        self
494    }
495
496    /// Set reply-to queue
497    pub fn reply_to<S: Into<String>>(mut self, queue: S) -> Self {
498        self.reply_to = Some(queue.into());
499        self
500    }
501
502    /// Set message TTL
503    pub fn ttl(mut self, ttl: Duration) -> Self {
504        self.ttl = Some(ttl);
505        self
506    }
507
508    /// Set message priority
509    pub fn priority(mut self, priority: u8) -> Self {
510        self.priority = Some(priority);
511        self
512    }
513
514    /// Add a custom header
515    pub fn header<S: Into<String>>(mut self, key: S, value: lapin::types::AMQPValue) -> Self {
516        self.headers.insert(key.into(), value);
517        self
518    }
519
520    /// Add a string header
521    pub fn header_string<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
522        self.headers.insert(
523            key.into(),
524            lapin::types::AMQPValue::LongString(value.into().into()),
525        );
526        self
527    }
528
529    /// Add an integer header
530    pub fn header_int<K: Into<String>>(mut self, key: K, value: i64) -> Self {
531        self.headers
532            .insert(key.into(), lapin::types::AMQPValue::LongLongInt(value));
533        self
534    }
535
536    /// Enable auto-declare queue
537    pub fn auto_declare_queue(mut self) -> Self {
538        self.auto_declare_queue = true;
539        self
540    }
541
542    /// Enable auto-declare exchange
543    pub fn auto_declare_exchange(mut self) -> Self {
544        self.auto_declare_exchange = true;
545        self
546    }
547
548    /// Set queue options
549    pub fn queue_options(mut self, options: CustomQueueDeclareOptions) -> Self {
550        self.queue_options = options;
551        self
552    }
553
554    /// Set exchange options
555    pub fn exchange_options(mut self, options: CustomExchangeDeclareOptions) -> Self {
556        self.exchange_options = options;
557        self
558    }
559
560    /// Configure for request-response pattern
561    pub fn request_response<S: Into<String>>(mut self, reply_to: S, correlation_id: S) -> Self {
562        self.reply_to = Some(reply_to.into());
563        self.correlation_id = Some(correlation_id.into());
564        self
565    }
566
567    /// Configure for development (auto-declare everything)
568    pub fn development(mut self) -> Self {
569        self.auto_declare_queue = true;
570        self.auto_declare_exchange = true;
571        self
572    }
573
574    /// Configure for production (no auto-declare)
575    pub fn production(mut self) -> Self {
576        self.auto_declare_queue = false;
577        self.auto_declare_exchange = false;
578        self.persistent = true;
579        self
580    }
581
582    /// Build the final options
583    pub fn build(self) -> PublishOptions {
584        PublishOptions {
585            persistent: self.persistent,
586            message_id: self.message_id,
587            correlation_id: self.correlation_id,
588            reply_to: self.reply_to,
589            ttl: self.ttl,
590            priority: self.priority,
591            headers: self.headers,
592            auto_declare_queue: self.auto_declare_queue,
593            auto_declare_exchange: self.auto_declare_exchange,
594            queue_options: self.queue_options,
595            exchange_options: self.exchange_options,
596        }
597    }
598}
599
600impl Default for PublishOptionsBuilder {
601    fn default() -> Self {
602        Self::new()
603    }
604}
605
606impl Default for PublishOptions {
607    fn default() -> Self {
608        Self {
609            persistent: true,
610            message_id: Some(Uuid::new_v4().to_string()),
611            correlation_id: None,
612            reply_to: None,
613            ttl: None,
614            priority: None,
615            headers: HashMap::new(),
616            auto_declare_queue: false,
617            auto_declare_exchange: false,
618            queue_options: CustomQueueDeclareOptions::default(),
619            exchange_options: CustomExchangeDeclareOptions::default(),
620        }
621    }
622}
623
624/// Custom Queue declaration options (wrapper around lapin's options)
625#[derive(Debug, Clone)]
626pub struct CustomQueueDeclareOptions {
627    pub passive: bool,
628    pub durable: bool,
629    pub exclusive: bool,
630    pub auto_delete: bool,
631    pub arguments: FieldTable,
632}
633
634impl Default for CustomQueueDeclareOptions {
635    fn default() -> Self {
636        Self {
637            passive: false,
638            durable: true,
639            exclusive: false,
640            auto_delete: false,
641            arguments: FieldTable::default(),
642        }
643    }
644}
645
646/// Custom Exchange declaration options (wrapper around lapin's options)
647#[derive(Debug, Clone)]
648pub struct CustomExchangeDeclareOptions {
649    pub passive: bool,
650    pub durable: bool,
651    pub auto_delete: bool,
652    pub internal: bool,
653    pub exchange_type: ExchangeKind,
654    pub original_type: ExchangeKind, // Used for delayed message exchanges
655    pub arguments: FieldTable,
656}
657
658impl Default for CustomExchangeDeclareOptions {
659    fn default() -> Self {
660        Self {
661            passive: false,
662            durable: true,
663            auto_delete: false,
664            internal: false,
665            exchange_type: ExchangeKind::Direct,
666            original_type: ExchangeKind::Direct,
667            arguments: FieldTable::default(),
668        }
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use super::*;
675
676    #[test]
677    fn test_publish_options_default() {
678        let options = PublishOptions::default();
679        assert!(options.persistent);
680        assert!(options.message_id.is_some());
681        assert!(options.correlation_id.is_none());
682        assert!(options.reply_to.is_none());
683        assert!(options.ttl.is_none());
684        assert!(options.priority.is_none());
685        assert!(options.headers.is_empty());
686        assert!(!options.auto_declare_queue);
687        assert!(!options.auto_declare_exchange);
688    }
689
690    #[test]
691    fn test_queue_declare_options_default() {
692        let options = CustomQueueDeclareOptions::default();
693        assert!(!options.passive);
694        assert!(options.durable);
695        assert!(!options.exclusive);
696        assert!(!options.auto_delete);
697    }
698
699    #[test]
700    fn test_exchange_declare_options_default() {
701        let options = CustomExchangeDeclareOptions::default();
702        assert!(!options.passive);
703        assert!(options.durable);
704        assert!(!options.auto_delete);
705        assert!(!options.internal);
706        assert!(matches!(options.exchange_type, ExchangeKind::Direct));
707        assert!(matches!(options.original_type, ExchangeKind::Direct));
708    }
709}