rust_rabbit/
publisher.rs

1use crate::{
2    connection::ConnectionManager,
3    error::{RabbitError, Result},
4    metrics::{MetricsTimer, RustRabbitMetrics},
5};
6use lapin::{
7    options::{
8        BasicPublishOptions, ExchangeDeclareOptions as LapinExchangeDeclareOptions,
9        QueueDeclareOptions as LapinQueueDeclareOptions,
10    },
11    types::FieldTable,
12    BasicProperties, Channel, ExchangeKind,
13};
14use serde::Serialize;
15use std::{collections::HashMap, time::Duration};
16use tracing::debug;
17use uuid::Uuid;
18
19/// Publisher for sending messages to RabbitMQ
20#[derive(Debug, Clone)]
21pub struct Publisher {
22    connection_manager: ConnectionManager,
23    metrics: Option<RustRabbitMetrics>,
24}
25
26impl Publisher {
27    /// Create a new publisher
28    pub fn new(connection_manager: ConnectionManager) -> Self {
29        Self {
30            connection_manager,
31            metrics: None,
32        }
33    }
34
35    /// Set metrics for this publisher
36    pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
37        self.metrics = Some(metrics);
38    }
39
40    /// Publish a message to a queue
41    pub async fn publish_to_queue<T>(
42        &self,
43        queue_name: &str,
44        message: &T,
45        options: Option<PublishOptions>,
46    ) -> Result<()>
47    where
48        T: Serialize,
49    {
50        let timer = MetricsTimer::new();
51        let channel = self.get_channel().await?;
52
53        // Declare queue if auto_declare is enabled
54        let options = options.unwrap_or_default();
55        if options.auto_declare_queue {
56            self.declare_queue(&channel, queue_name, &options.queue_options)
57                .await?;
58        }
59
60        let payload = self.serialize_message(message)?;
61        let properties = self.build_basic_properties(&options)?;
62
63        let result = channel
64            .basic_publish(
65                "", // default exchange
66                queue_name,
67                BasicPublishOptions::default(),
68                &payload,
69                properties,
70            )
71            .await;
72
73        // Record metrics
74        if let Some(metrics) = &self.metrics {
75            match &result {
76                Ok(_) => {
77                    metrics.record_message_published(queue_name, "", queue_name);
78                    metrics.record_publish_duration(queue_name, "", timer.elapsed());
79                }
80                Err(_) => {
81                    // Error will be handled by the caller, but we can still record timing
82                    metrics.record_publish_duration(queue_name, "", timer.elapsed());
83                }
84            }
85        }
86
87        result?;
88        debug!("Published message to queue: {}", queue_name);
89        Ok(())
90    }
91
92    /// Publish a message to an exchange
93    pub async fn publish_to_exchange<T>(
94        &self,
95        exchange_name: &str,
96        routing_key: &str,
97        message: &T,
98        options: Option<PublishOptions>,
99    ) -> Result<()>
100    where
101        T: Serialize,
102    {
103        let timer = MetricsTimer::new();
104        let channel = self.get_channel().await?;
105
106        // Declare exchange if auto_declare is enabled
107        let options = options.unwrap_or_default();
108        if options.auto_declare_exchange {
109            self.declare_exchange(&channel, exchange_name, &options.exchange_options)
110                .await?;
111        }
112
113        let payload = self.serialize_message(message)?;
114        let properties = self.build_basic_properties(&options)?;
115
116        let result = channel
117            .basic_publish(
118                exchange_name,
119                routing_key,
120                BasicPublishOptions::default(),
121                &payload,
122                properties,
123            )
124            .await;
125
126        // Record metrics
127        if let Some(metrics) = &self.metrics {
128            match &result {
129                Ok(_) => {
130                    metrics.record_message_published("", exchange_name, routing_key);
131                    metrics.record_publish_duration("", exchange_name, timer.elapsed());
132                }
133                Err(_) => {
134                    metrics.record_publish_duration("", exchange_name, timer.elapsed());
135                }
136            }
137        }
138
139        result?;
140        debug!(
141            "Published message to exchange: {} with routing key: {}",
142            exchange_name, routing_key
143        );
144        Ok(())
145    }
146
147    /// Publish a delayed message using the delayed message exchange plugin
148    pub async fn publish_delayed<T>(
149        &self,
150        exchange_name: &str,
151        routing_key: &str,
152        message: &T,
153        delay: Duration,
154        options: Option<PublishOptions>,
155    ) -> Result<()>
156    where
157        T: Serialize,
158    {
159        let channel = self.get_channel().await?;
160
161        let options = options.unwrap_or_default();
162
163        // Ensure the exchange is declared as delayed type
164        if options.auto_declare_exchange {
165            let mut exchange_opts = options.exchange_options.clone();
166            exchange_opts.exchange_type = ExchangeKind::Custom("x-delayed-message".to_string());
167
168            // We'll handle arguments in the declare_exchange method
169            self.declare_exchange(&channel, exchange_name, &exchange_opts)
170                .await?;
171        }
172
173        let payload = self.serialize_message(message)?;
174        let mut properties = self.build_basic_properties(&options)?;
175
176        // Add delay header
177        let mut headers = properties.headers().clone().unwrap_or_default();
178        headers.insert(
179            "x-delay".into(),
180            lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
181        );
182        properties = properties.with_headers(headers);
183
184        channel
185            .basic_publish(
186                exchange_name,
187                routing_key,
188                BasicPublishOptions::default(),
189                &payload,
190                properties,
191            )
192            .await?;
193
194        debug!(
195            "Published delayed message to exchange: {} with delay: {:?}",
196            exchange_name, delay
197        );
198        Ok(())
199    }
200
201    /// Publish a message with TTL (Time To Live)
202    pub async fn publish_with_ttl<T>(
203        &self,
204        exchange_name: &str,
205        routing_key: &str,
206        message: &T,
207        ttl: Duration,
208        options: Option<PublishOptions>,
209    ) -> Result<()>
210    where
211        T: Serialize,
212    {
213        let mut options = options.unwrap_or_default();
214        options.ttl = Some(ttl);
215
216        self.publish_to_exchange(exchange_name, routing_key, message, Some(options))
217            .await
218    }
219
220    /// Get a channel from the connection manager
221    async fn get_channel(&self) -> Result<Channel> {
222        let connection = self.connection_manager.get_connection().await?;
223        connection.create_channel().await
224    }
225
226    /// Serialize message to bytes
227    fn serialize_message<T>(&self, message: &T) -> Result<Vec<u8>>
228    where
229        T: Serialize,
230    {
231        serde_json::to_vec(message).map_err(RabbitError::Serialization)
232    }
233
234    /// Build BasicProperties from PublishOptions
235    fn build_basic_properties(&self, options: &PublishOptions) -> Result<BasicProperties> {
236        let mut properties = BasicProperties::default()
237            .with_content_type("application/json".into())
238            .with_delivery_mode(if options.persistent { 2 } else { 1 });
239
240        if let Some(message_id) = &options.message_id {
241            properties = properties.with_message_id(message_id.clone().into());
242        }
243
244        if let Some(correlation_id) = &options.correlation_id {
245            properties = properties.with_correlation_id(correlation_id.clone().into());
246        }
247
248        if let Some(reply_to) = &options.reply_to {
249            properties = properties.with_reply_to(reply_to.clone().into());
250        }
251
252        if let Some(ttl) = options.ttl {
253            properties = properties.with_expiration(ttl.as_millis().to_string().into());
254        }
255
256        if let Some(priority) = options.priority {
257            properties = properties.with_priority(priority);
258        }
259
260        if !options.headers.is_empty() {
261            let mut field_table = FieldTable::default();
262            for (key, value) in &options.headers {
263                field_table.insert(key.clone().into(), value.clone());
264            }
265            properties = properties.with_headers(field_table);
266        }
267
268        properties = properties.with_timestamp(chrono::Utc::now().timestamp() as u64);
269
270        Ok(properties)
271    }
272
273    /// Declare a queue
274    async fn declare_queue(
275        &self,
276        channel: &Channel,
277        queue_name: &str,
278        options: &CustomQueueDeclareOptions,
279    ) -> Result<()> {
280        let queue_options = LapinQueueDeclareOptions {
281            passive: options.passive,
282            durable: options.durable,
283            exclusive: options.exclusive,
284            auto_delete: options.auto_delete,
285            nowait: false,
286        };
287
288        channel
289            .queue_declare(queue_name, queue_options, options.arguments.clone())
290            .await?;
291
292        debug!("Declared queue: {}", queue_name);
293        Ok(())
294    }
295
296    /// Declare an exchange
297    async fn declare_exchange(
298        &self,
299        channel: &Channel,
300        exchange_name: &str,
301        options: &CustomExchangeDeclareOptions,
302    ) -> Result<()> {
303        let exchange_kind = match &options.exchange_type {
304            ExchangeKind::Custom(custom_type) => {
305                if custom_type == "x-delayed-message" {
306                    // For delayed message exchange, we need special handling
307                    let mut arguments = options.arguments.clone();
308                    let original_type_str = match &options.original_type {
309                        ExchangeKind::Direct => "direct",
310                        ExchangeKind::Fanout => "fanout",
311                        ExchangeKind::Topic => "topic",
312                        ExchangeKind::Headers => "headers",
313                        ExchangeKind::Custom(custom) => custom,
314                    };
315                    arguments.insert(
316                        "x-delayed-type".into(),
317                        lapin::types::AMQPValue::LongString(original_type_str.into()),
318                    );
319
320                    let exchange_options = LapinExchangeDeclareOptions {
321                        passive: options.passive,
322                        durable: options.durable,
323                        auto_delete: options.auto_delete,
324                        internal: options.internal,
325                        nowait: false,
326                    };
327
328                    channel
329                        .exchange_declare(
330                            exchange_name,
331                            ExchangeKind::Custom("x-delayed-message".to_string()),
332                            exchange_options,
333                            arguments,
334                        )
335                        .await?;
336
337                    debug!("Declared delayed message exchange: {}", exchange_name);
338                    return Ok(());
339                } else {
340                    lapin::ExchangeKind::Custom(custom_type.clone())
341                }
342            }
343            other => other.clone(),
344        };
345
346        let exchange_options = LapinExchangeDeclareOptions {
347            passive: options.passive,
348            durable: options.durable,
349            auto_delete: options.auto_delete,
350            internal: options.internal,
351            nowait: false,
352        };
353
354        channel
355            .exchange_declare(
356                exchange_name,
357                exchange_kind,
358                exchange_options,
359                options.arguments.clone(),
360            )
361            .await?;
362
363        debug!(
364            "Declared exchange: {} of type: {:?}",
365            exchange_name, options.exchange_type
366        );
367        Ok(())
368    }
369}
370
371/// Options for publishing messages
372#[derive(Debug, Clone)]
373pub struct PublishOptions {
374    /// Whether the message should be persistent
375    pub persistent: bool,
376
377    /// Message ID
378    pub message_id: Option<String>,
379
380    /// Correlation ID for request-response patterns
381    pub correlation_id: Option<String>,
382
383    /// Reply-to queue for RPC patterns
384    pub reply_to: Option<String>,
385
386    /// Message Time To Live
387    pub ttl: Option<Duration>,
388
389    /// Message priority (0-255)
390    pub priority: Option<u8>,
391
392    /// Custom headers
393    pub headers: HashMap<String, lapin::types::AMQPValue>,
394
395    /// Auto-declare queue before publishing
396    pub auto_declare_queue: bool,
397
398    /// Auto-declare exchange before publishing
399    pub auto_declare_exchange: bool,
400
401    /// Queue declaration options
402    pub queue_options: CustomQueueDeclareOptions,
403
404    /// Exchange declaration options
405    pub exchange_options: CustomExchangeDeclareOptions,
406}
407
408impl PublishOptions {
409    /// Create a new publish options builder
410    pub fn builder() -> PublishOptionsBuilder {
411        PublishOptionsBuilder::new()
412    }
413}
414
415/// Builder for PublishOptions
416#[derive(Debug, Clone)]
417pub struct PublishOptionsBuilder {
418    persistent: bool,
419    message_id: Option<String>,
420    correlation_id: Option<String>,
421    reply_to: Option<String>,
422    ttl: Option<Duration>,
423    priority: Option<u8>,
424    headers: HashMap<String, lapin::types::AMQPValue>,
425    auto_declare_queue: bool,
426    auto_declare_exchange: bool,
427    queue_options: CustomQueueDeclareOptions,
428    exchange_options: CustomExchangeDeclareOptions,
429}
430
431impl PublishOptionsBuilder {
432    /// Create a new builder with default values
433    pub fn new() -> Self {
434        Self {
435            persistent: true,
436            message_id: Some(Uuid::new_v4().to_string()),
437            correlation_id: None,
438            reply_to: None,
439            ttl: None,
440            priority: None,
441            headers: HashMap::new(),
442            auto_declare_queue: false,
443            auto_declare_exchange: false,
444            queue_options: CustomQueueDeclareOptions::default(),
445            exchange_options: CustomExchangeDeclareOptions::default(),
446        }
447    }
448
449    /// Set message persistence
450    pub fn persistent(mut self, persistent: bool) -> Self {
451        self.persistent = persistent;
452        self
453    }
454
455    /// Make message persistent
456    pub fn durable(mut self) -> Self {
457        self.persistent = true;
458        self
459    }
460
461    /// Make message non-persistent
462    pub fn transient(mut self) -> Self {
463        self.persistent = false;
464        self
465    }
466
467    /// Set message ID
468    pub fn message_id<S: Into<String>>(mut self, id: S) -> Self {
469        self.message_id = Some(id.into());
470        self
471    }
472
473    /// Generate random message ID
474    pub fn random_message_id(mut self) -> Self {
475        self.message_id = Some(Uuid::new_v4().to_string());
476        self
477    }
478
479    /// Clear message ID
480    pub fn no_message_id(mut self) -> Self {
481        self.message_id = None;
482        self
483    }
484
485    /// Set correlation ID
486    pub fn correlation_id<S: Into<String>>(mut self, id: S) -> Self {
487        self.correlation_id = Some(id.into());
488        self
489    }
490
491    /// Set reply-to queue
492    pub fn reply_to<S: Into<String>>(mut self, queue: S) -> Self {
493        self.reply_to = Some(queue.into());
494        self
495    }
496
497    /// Set message TTL
498    pub fn ttl(mut self, ttl: Duration) -> Self {
499        self.ttl = Some(ttl);
500        self
501    }
502
503    /// Set message priority
504    pub fn priority(mut self, priority: u8) -> Self {
505        self.priority = Some(priority);
506        self
507    }
508
509    /// Add a custom header
510    pub fn header<S: Into<String>>(mut self, key: S, value: lapin::types::AMQPValue) -> Self {
511        self.headers.insert(key.into(), value);
512        self
513    }
514
515    /// Add a string header
516    pub fn header_string<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
517        self.headers.insert(
518            key.into(),
519            lapin::types::AMQPValue::LongString(value.into().into()),
520        );
521        self
522    }
523
524    /// Add an integer header
525    pub fn header_int<K: Into<String>>(mut self, key: K, value: i64) -> Self {
526        self.headers
527            .insert(key.into(), lapin::types::AMQPValue::LongLongInt(value));
528        self
529    }
530
531    /// Enable auto-declare queue
532    pub fn auto_declare_queue(mut self) -> Self {
533        self.auto_declare_queue = true;
534        self
535    }
536
537    /// Enable auto-declare exchange
538    pub fn auto_declare_exchange(mut self) -> Self {
539        self.auto_declare_exchange = true;
540        self
541    }
542
543    /// Set queue options
544    pub fn queue_options(mut self, options: CustomQueueDeclareOptions) -> Self {
545        self.queue_options = options;
546        self
547    }
548
549    /// Set exchange options
550    pub fn exchange_options(mut self, options: CustomExchangeDeclareOptions) -> Self {
551        self.exchange_options = options;
552        self
553    }
554
555    /// Configure for request-response pattern
556    pub fn request_response<S: Into<String>>(mut self, reply_to: S, correlation_id: S) -> Self {
557        self.reply_to = Some(reply_to.into());
558        self.correlation_id = Some(correlation_id.into());
559        self
560    }
561
562    /// Configure for development (auto-declare everything)
563    pub fn development(mut self) -> Self {
564        self.auto_declare_queue = true;
565        self.auto_declare_exchange = true;
566        self
567    }
568
569    /// Configure for production (no auto-declare)
570    pub fn production(mut self) -> Self {
571        self.auto_declare_queue = false;
572        self.auto_declare_exchange = false;
573        self.persistent = true;
574        self
575    }
576
577    /// Build the final options
578    pub fn build(self) -> PublishOptions {
579        PublishOptions {
580            persistent: self.persistent,
581            message_id: self.message_id,
582            correlation_id: self.correlation_id,
583            reply_to: self.reply_to,
584            ttl: self.ttl,
585            priority: self.priority,
586            headers: self.headers,
587            auto_declare_queue: self.auto_declare_queue,
588            auto_declare_exchange: self.auto_declare_exchange,
589            queue_options: self.queue_options,
590            exchange_options: self.exchange_options,
591        }
592    }
593}
594
595impl Default for PublishOptionsBuilder {
596    fn default() -> Self {
597        Self::new()
598    }
599}
600
601impl Default for PublishOptions {
602    fn default() -> Self {
603        Self {
604            persistent: true,
605            message_id: Some(Uuid::new_v4().to_string()),
606            correlation_id: None,
607            reply_to: None,
608            ttl: None,
609            priority: None,
610            headers: HashMap::new(),
611            auto_declare_queue: false,
612            auto_declare_exchange: false,
613            queue_options: CustomQueueDeclareOptions::default(),
614            exchange_options: CustomExchangeDeclareOptions::default(),
615        }
616    }
617}
618
619/// Custom Queue declaration options (wrapper around lapin's options)
620#[derive(Debug, Clone)]
621pub struct CustomQueueDeclareOptions {
622    pub passive: bool,
623    pub durable: bool,
624    pub exclusive: bool,
625    pub auto_delete: bool,
626    pub arguments: FieldTable,
627}
628
629impl Default for CustomQueueDeclareOptions {
630    fn default() -> Self {
631        Self {
632            passive: false,
633            durable: true,
634            exclusive: false,
635            auto_delete: false,
636            arguments: FieldTable::default(),
637        }
638    }
639}
640
641/// Custom Exchange declaration options (wrapper around lapin's options)
642#[derive(Debug, Clone)]
643pub struct CustomExchangeDeclareOptions {
644    pub passive: bool,
645    pub durable: bool,
646    pub auto_delete: bool,
647    pub internal: bool,
648    pub exchange_type: ExchangeKind,
649    pub original_type: ExchangeKind, // Used for delayed message exchanges
650    pub arguments: FieldTable,
651}
652
653impl Default for CustomExchangeDeclareOptions {
654    fn default() -> Self {
655        Self {
656            passive: false,
657            durable: true,
658            auto_delete: false,
659            internal: false,
660            exchange_type: ExchangeKind::Direct,
661            original_type: ExchangeKind::Direct,
662            arguments: FieldTable::default(),
663        }
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670
671    #[test]
672    fn test_publish_options_default() {
673        let options = PublishOptions::default();
674        assert!(options.persistent);
675        assert!(options.message_id.is_some());
676        assert!(options.correlation_id.is_none());
677        assert!(options.reply_to.is_none());
678        assert!(options.ttl.is_none());
679        assert!(options.priority.is_none());
680        assert!(options.headers.is_empty());
681        assert!(!options.auto_declare_queue);
682        assert!(!options.auto_declare_exchange);
683    }
684
685    #[test]
686    fn test_queue_declare_options_default() {
687        let options = CustomQueueDeclareOptions::default();
688        assert!(!options.passive);
689        assert!(options.durable);
690        assert!(!options.exclusive);
691        assert!(!options.auto_delete);
692    }
693
694    #[test]
695    fn test_exchange_declare_options_default() {
696        let options = CustomExchangeDeclareOptions::default();
697        assert!(!options.passive);
698        assert!(options.durable);
699        assert!(!options.auto_delete);
700        assert!(!options.internal);
701        assert!(matches!(options.exchange_type, ExchangeKind::Direct));
702        assert!(matches!(options.original_type, ExchangeKind::Direct));
703    }
704}