rust_rabbit/
publisher.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{MassTransitEnvelope, MessageEnvelope},
5};
6use lapin::{
7    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
8    types::{AMQPValue, FieldTable},
9    BasicProperties, Channel, ExchangeKind,
10};
11use serde::Serialize;
12use std::sync::Arc;
13use tracing::debug;
14use url::Url;
15
16/// Publish options builder
17#[derive(Debug, Clone, Default)]
18pub struct PublishOptions {
19    pub mandatory: bool,
20    pub immediate: bool,
21    pub expiration: Option<String>,
22    pub priority: Option<u8>,
23    /// Enable MassTransit format conversion
24    pub masstransit: Option<MassTransitOptions>,
25}
26
27/// MassTransit-specific options for message publishing
28#[derive(Debug, Clone)]
29pub struct MassTransitOptions {
30    /// Message type in URN format: "urn:message:Namespace:TypeName"
31    /// or simple format: "Namespace:TypeName" (will be converted to URN)
32    pub message_type: String,
33    /// Optional correlation ID
34    pub correlation_id: Option<String>,
35    /// Optional source address (defaults to exchange/queue if not provided)
36    pub source_address: Option<String>,
37    /// Optional destination address (defaults to routing_key/queue if not provided)
38    pub destination_address: Option<String>,
39}
40
41impl PublishOptions {
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    pub fn mandatory(mut self) -> Self {
47        self.mandatory = true;
48        self
49    }
50
51    pub fn priority(mut self, priority: u8) -> Self {
52        self.priority = Some(priority);
53        self
54    }
55
56    pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
57        self.expiration = Some(expiration.into());
58        self
59    }
60
61    pub fn with_priority(mut self, priority: u8) -> Self {
62        self.priority = Some(priority);
63        self
64    }
65
66    /// Enable MassTransit format conversion
67    /// Message type can be in format "Namespace:TypeName" or "urn:message:Namespace:TypeName"
68    pub fn with_masstransit(mut self, message_type: impl Into<String>) -> Self {
69        self.masstransit = Some(MassTransitOptions {
70            message_type: message_type.into(),
71            correlation_id: None,
72            source_address: None,
73            destination_address: None,
74        });
75        self
76    }
77
78    /// Enable MassTransit format with full options
79    pub fn with_masstransit_options(mut self, options: MassTransitOptions) -> Self {
80        self.masstransit = Some(options);
81        self
82    }
83}
84
85impl MassTransitOptions {
86    /// Create new MassTransit options with message type
87    pub fn new(message_type: impl Into<String>) -> Self {
88        Self {
89            message_type: message_type.into(),
90            correlation_id: None,
91            source_address: None,
92            destination_address: None,
93        }
94    }
95
96    /// Set correlation ID
97    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
98        self.correlation_id = Some(correlation_id.into());
99        self
100    }
101
102    /// Set source address
103    pub fn with_source_address(mut self, source_address: impl Into<String>) -> Self {
104        self.source_address = Some(source_address.into());
105        self
106    }
107
108    /// Set destination address
109    pub fn with_destination_address(mut self, destination_address: impl Into<String>) -> Self {
110        self.destination_address = Some(destination_address.into());
111        self
112    }
113}
114
115/// Simplified Publisher for message publishing
116pub struct Publisher {
117    connection: Arc<Connection>,
118}
119
120impl Publisher {
121    /// Create a new publisher
122    pub fn new(connection: Arc<Connection>) -> Self {
123        Self { connection }
124    }
125
126    /// Publish message to an exchange
127    pub async fn publish_to_exchange<T>(
128        &self,
129        exchange: &str,
130        routing_key: &str,
131        message: &T,
132        options: Option<PublishOptions>,
133    ) -> Result<(), RustRabbitError>
134    where
135        T: Serialize,
136    {
137        let channel = self.connection.create_channel().await?;
138
139        // Declare exchange (simplified - always topic for flexibility)
140        channel
141            .exchange_declare(
142                exchange,
143                ExchangeKind::Topic,
144                ExchangeDeclareOptions {
145                    durable: true,
146                    ..Default::default()
147                },
148                FieldTable::default(),
149            )
150            .await?;
151
152        self.publish_message(&channel, exchange, routing_key, message, options)
153            .await
154    }
155
156    /// Publish message directly to a queue
157    pub async fn publish_to_queue<T>(
158        &self,
159        queue: &str,
160        message: &T,
161        options: Option<PublishOptions>,
162    ) -> Result<(), RustRabbitError>
163    where
164        T: Serialize,
165    {
166        let channel = self.connection.create_channel().await?;
167
168        // Declare queue
169        channel
170            .queue_declare(
171                queue,
172                QueueDeclareOptions {
173                    durable: true,
174                    ..Default::default()
175                },
176                FieldTable::default(),
177            )
178            .await?;
179
180        // Publish to default exchange with queue name as routing key
181        self.publish_message(&channel, "", queue, message, options)
182            .await
183    }
184
185    /// Internal method to publish message
186    /// Publishes raw payload with headers (retry_attempt and correlation_id in headers)
187    /// If MassTransit options are provided, wraps message in MassTransit envelope format
188    async fn publish_message<T>(
189        &self,
190        channel: &Channel,
191        exchange: &str,
192        routing_key: &str,
193        message: &T,
194        options: Option<PublishOptions>,
195    ) -> Result<(), RustRabbitError>
196    where
197        T: Serialize,
198    {
199        let options = options.unwrap_or_default();
200
201        // Check if MassTransit conversion is requested
202        let payload = if let Some(mt_options) = &options.masstransit {
203            // Create MassTransit envelope
204            let mut envelope =
205                MassTransitEnvelope::with_message_type(message, &mt_options.message_type)
206                    .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
207
208            // Set correlation ID if provided
209            if let Some(corr_id) = &mt_options.correlation_id {
210                envelope = envelope.with_correlation_id(corr_id.clone());
211            }
212
213            // Extract host from connection URL for MassTransit addresses
214            let host = self
215                .connection
216                .url()
217                .parse::<Url>()
218                .ok()
219                .and_then(|url| url.host_str().map(|h| h.to_string()))
220                .unwrap_or_else(|| "localhost".to_string());
221
222            // Set source address (default to exchange if not provided)
223            let source = mt_options
224                .source_address
225                .clone()
226                .unwrap_or_else(|| format!("rabbitmq://{}/{}", host, exchange));
227            envelope = envelope.with_source_address(source);
228
229            // Set destination address (default to routing key if not provided)
230            let dest = mt_options
231                .destination_address
232                .clone()
233                .unwrap_or_else(|| format!("rabbitmq://{}/{}", host, routing_key));
234            envelope = envelope.with_destination_address(dest);
235
236            // Serialize MassTransit envelope
237            serde_json::to_vec(&envelope)
238                .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
239        } else {
240            // Serialize raw payload (no wrapper)
241            serde_json::to_vec(message)
242                .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
243        };
244
245        // Build properties with headers
246        // Create headers with retry_attempt = 0 (first attempt)
247        let mut headers = FieldTable::default();
248        headers.insert("x-retry-attempt".into(), AMQPValue::LongLongInt(0));
249
250        let mut properties = BasicProperties::default()
251            .with_content_type("application/json".into())
252            .with_delivery_mode(2) // Persistent
253            .with_headers(headers);
254
255        if let Some(expiration) = options.expiration {
256            properties = properties.with_expiration(expiration.into());
257        }
258
259        if let Some(priority) = options.priority {
260            properties = properties.with_priority(priority);
261        }
262
263        // Publish message
264        let confirm = channel
265            .basic_publish(
266                exchange,
267                routing_key,
268                BasicPublishOptions {
269                    mandatory: options.mandatory,
270                    immediate: options.immediate,
271                },
272                &payload,
273                properties,
274            )
275            .await?;
276
277        // Wait for confirmation (simplified)
278        confirm.await?;
279
280        if options.masstransit.is_some() {
281            debug!(
282                "Published MassTransit message to exchange '{}' with routing key '{}'",
283                exchange, routing_key
284            );
285        } else {
286            debug!(
287                "Published message to exchange '{}' with routing key '{}'",
288                exchange, routing_key
289            );
290        }
291
292        Ok(())
293    }
294
295    /// Publish a message envelope to an exchange (includes retry metadata)
296    pub async fn publish_envelope_to_exchange<T>(
297        &self,
298        exchange: &str,
299        routing_key: &str,
300        envelope: &MessageEnvelope<T>,
301        options: Option<PublishOptions>,
302    ) -> Result<(), RustRabbitError>
303    where
304        T: Serialize,
305    {
306        self.publish_to_exchange(exchange, routing_key, envelope, options)
307            .await
308    }
309
310    /// Publish a message envelope directly to a queue (includes retry metadata)
311    pub async fn publish_envelope_to_queue<T>(
312        &self,
313        queue: &str,
314        envelope: &MessageEnvelope<T>,
315        options: Option<PublishOptions>,
316    ) -> Result<(), RustRabbitError>
317    where
318        T: Serialize,
319    {
320        self.publish_to_queue(queue, envelope, options).await
321    }
322
323    /// Create a message envelope with source tracking and publish to exchange
324    pub async fn publish_with_envelope<T>(
325        &self,
326        exchange: &str,
327        routing_key: &str,
328        payload: &T,
329        source_queue: &str,
330        max_retries: u32,
331        options: Option<PublishOptions>,
332    ) -> Result<(), RustRabbitError>
333    where
334        T: Serialize + Clone,
335    {
336        let envelope = MessageEnvelope::with_source(
337            payload.clone(),
338            source_queue,
339            Some(exchange),
340            Some(routing_key),
341            Some("rust-rabbit-publisher"), // Publisher identifier
342        )
343        .with_max_retries(max_retries);
344
345        self.publish_envelope_to_exchange(exchange, routing_key, &envelope, options)
346            .await
347    }
348
349    /// Create a message envelope and publish directly to queue
350    pub async fn publish_with_envelope_to_queue<T>(
351        &self,
352        queue: &str,
353        payload: &T,
354        max_retries: u32,
355        options: Option<PublishOptions>,
356    ) -> Result<(), RustRabbitError>
357    where
358        T: Serialize + Clone,
359    {
360        let envelope = MessageEnvelope::new(payload.clone(), queue).with_max_retries(max_retries);
361
362        self.publish_envelope_to_queue(queue, &envelope, options)
363            .await
364    }
365
366    /// Publish a message to MassTransit-compatible exchange
367    /// This ensures the message format matches MassTransit's expectations
368    ///
369    /// # Arguments
370    /// * `exchange` - Exchange name (MassTransit typically uses exchange names)
371    /// * `routing_key` - Routing key (often the message type name)
372    /// * `message` - The message payload to publish
373    /// * `message_type` - Message type name (e.g., "YourNamespace:YourMessageType") - required for MassTransit routing
374    /// * `options` - Optional publish options
375    ///
376    /// # Example
377    /// ```rust,no_run
378    /// use rust_rabbit::{Connection, Publisher};
379    ///
380    /// #[derive(serde::Serialize)]
381    /// struct OrderCreated {
382    ///     order_id: u32,
383    ///     amount: f64,
384    /// }
385    ///
386    /// #[tokio::main]
387    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
388    ///     let connection = Connection::new("amqp://localhost:5672").await?;
389    ///     let publisher = Publisher::new(connection);
390    ///     
391    ///     let order = OrderCreated { order_id: 123, amount: 99.99 };
392    ///     publisher.publish_masstransit_to_exchange(
393    ///         "order-exchange",
394    ///         "order.created",
395    ///         &order,
396    ///         "Contracts:OrderCreated", // Message type for MassTransit
397    ///         None
398    ///     ).await?;
399    ///     
400    ///     Ok(())
401    /// }
402    /// ```
403    pub async fn publish_masstransit_to_exchange<T>(
404        &self,
405        exchange: &str,
406        routing_key: &str,
407        message: &T,
408        message_type: &str,
409        options: Option<PublishOptions>,
410    ) -> Result<(), RustRabbitError>
411    where
412        T: Serialize,
413    {
414        let channel = self.connection.create_channel().await?;
415
416        // Declare exchange (MassTransit typically uses topic exchanges)
417        channel
418            .exchange_declare(
419                exchange,
420                ExchangeKind::Topic,
421                ExchangeDeclareOptions {
422                    durable: true,
423                    ..Default::default()
424                },
425                FieldTable::default(),
426            )
427            .await?;
428
429        // Extract host from connection URL for MassTransit addresses
430        let host = self
431            .connection
432            .url()
433            .parse::<Url>()
434            .ok()
435            .and_then(|url| url.host_str().map(|h| h.to_string()))
436            .unwrap_or_else(|| "localhost".to_string());
437
438        // Create MassTransit envelope with message type
439        let envelope = MassTransitEnvelope::with_message_type(message, message_type)
440            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
441            .with_source_address(format!("rabbitmq://{}/{}", host, exchange))
442            .with_destination_address(format!("rabbitmq://{}/{}", host, routing_key));
443
444        // Serialize envelope
445        let payload = serde_json::to_vec(&envelope)
446            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
447
448        // Build properties
449        let options = options.unwrap_or_default();
450        let mut properties = BasicProperties::default()
451            .with_content_type("application/json".into())
452            .with_delivery_mode(2) // Persistent
453            .with_headers(FieldTable::default());
454
455        if let Some(expiration) = options.expiration {
456            properties = properties.with_expiration(expiration.into());
457        }
458
459        if let Some(priority) = options.priority {
460            properties = properties.with_priority(priority);
461        }
462
463        // Publish message
464        let confirm = channel
465            .basic_publish(
466                exchange,
467                routing_key,
468                BasicPublishOptions {
469                    mandatory: options.mandatory,
470                    immediate: options.immediate,
471                },
472                &payload,
473                properties,
474            )
475            .await?;
476
477        confirm.await?;
478
479        debug!(
480            "Published MassTransit message to exchange '{}' with routing key '{}' (type: {})",
481            exchange, routing_key, message_type
482        );
483
484        Ok(())
485    }
486
487    /// Publish a message to MassTransit-compatible queue
488    /// This ensures the message format matches MassTransit's expectations
489    ///
490    /// # Arguments
491    /// * `queue` - Queue name
492    /// * `message` - The message payload to publish
493    /// * `message_type` - Message type name (e.g., "YourNamespace:YourMessageType") - required for MassTransit routing
494    /// * `options` - Optional publish options
495    ///
496    /// # Example
497    /// ```rust,no_run
498    /// use rust_rabbit::{Connection, Publisher};
499    ///
500    /// #[derive(serde::Serialize)]
501    /// struct OrderCreated {
502    ///     order_id: u32,
503    ///     amount: f64,
504    /// }
505    ///
506    /// #[tokio::main]
507    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
508    ///     let connection = Connection::new("amqp://localhost:5672").await?;
509    ///     let publisher = Publisher::new(connection);
510    ///     
511    ///     let order = OrderCreated { order_id: 123, amount: 99.99 };
512    ///     publisher.publish_masstransit_to_queue(
513    ///         "order-queue",
514    ///         &order,
515    ///         "Contracts:OrderCreated", // Message type for MassTransit
516    ///         None
517    ///     ).await?;
518    ///     
519    ///     Ok(())
520    /// }
521    /// ```
522    pub async fn publish_masstransit_to_queue<T>(
523        &self,
524        queue: &str,
525        message: &T,
526        message_type: &str,
527        options: Option<PublishOptions>,
528    ) -> Result<(), RustRabbitError>
529    where
530        T: Serialize,
531    {
532        let channel = self.connection.create_channel().await?;
533
534        // Declare queue
535        channel
536            .queue_declare(
537                queue,
538                QueueDeclareOptions {
539                    durable: true,
540                    ..Default::default()
541                },
542                FieldTable::default(),
543            )
544            .await?;
545
546        // Extract host from connection URL for MassTransit addresses
547        let host = self
548            .connection
549            .url()
550            .parse::<Url>()
551            .ok()
552            .and_then(|url| url.host_str().map(|h| h.to_string()))
553            .unwrap_or_else(|| "localhost".to_string());
554
555        // Create MassTransit envelope with message type
556        let envelope = MassTransitEnvelope::with_message_type(message, message_type)
557            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?
558            .with_source_address(format!("rabbitmq://{}/{}", host, queue))
559            .with_destination_address(format!("rabbitmq://{}/{}", host, queue));
560
561        // Serialize envelope
562        let payload = serde_json::to_vec(&envelope)
563            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
564
565        // Build properties
566        let options = options.unwrap_or_default();
567        let mut properties = BasicProperties::default()
568            .with_content_type("application/json".into())
569            .with_delivery_mode(2) // Persistent
570            .with_headers(FieldTable::default());
571
572        if let Some(expiration) = options.expiration {
573            properties = properties.with_expiration(expiration.into());
574        }
575
576        if let Some(priority) = options.priority {
577            properties = properties.with_priority(priority);
578        }
579
580        // Publish to default exchange with queue name as routing key
581        let confirm = channel
582            .basic_publish(
583                "", // Default exchange
584                queue,
585                BasicPublishOptions {
586                    mandatory: options.mandatory,
587                    immediate: options.immediate,
588                },
589                &payload,
590                properties,
591            )
592            .await?;
593
594        confirm.await?;
595
596        debug!(
597            "Published MassTransit message to queue '{}' (type: {})",
598            queue, message_type
599        );
600
601        Ok(())
602    }
603
604    /// Publish a MassTransit envelope (already created) to an exchange
605    /// Useful when you need full control over the envelope structure
606    pub async fn publish_masstransit_envelope_to_exchange(
607        &self,
608        exchange: &str,
609        routing_key: &str,
610        envelope: &MassTransitEnvelope,
611        options: Option<PublishOptions>,
612    ) -> Result<(), RustRabbitError> {
613        let channel = self.connection.create_channel().await?;
614
615        // Declare exchange
616        channel
617            .exchange_declare(
618                exchange,
619                ExchangeKind::Topic,
620                ExchangeDeclareOptions {
621                    durable: true,
622                    ..Default::default()
623                },
624                FieldTable::default(),
625            )
626            .await?;
627
628        // Serialize envelope
629        let payload = serde_json::to_vec(envelope)
630            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
631
632        // Build properties
633        let options = options.unwrap_or_default();
634        let mut properties = BasicProperties::default()
635            .with_content_type("application/json".into())
636            .with_delivery_mode(2)
637            .with_headers(FieldTable::default());
638
639        if let Some(expiration) = options.expiration {
640            properties = properties.with_expiration(expiration.into());
641        }
642
643        if let Some(priority) = options.priority {
644            properties = properties.with_priority(priority);
645        }
646
647        let confirm = channel
648            .basic_publish(
649                exchange,
650                routing_key,
651                BasicPublishOptions {
652                    mandatory: options.mandatory,
653                    immediate: options.immediate,
654                },
655                &payload,
656                properties,
657            )
658            .await?;
659
660        confirm.await?;
661
662        debug!(
663            "Published MassTransit envelope to exchange '{}' with routing key '{}'",
664            exchange, routing_key
665        );
666
667        Ok(())
668    }
669
670    /// Publish a MassTransit envelope (already created) to a queue
671    /// Useful when you need full control over the envelope structure
672    pub async fn publish_masstransit_envelope_to_queue(
673        &self,
674        queue: &str,
675        envelope: &MassTransitEnvelope,
676        options: Option<PublishOptions>,
677    ) -> Result<(), RustRabbitError> {
678        let channel = self.connection.create_channel().await?;
679
680        // Declare queue
681        channel
682            .queue_declare(
683                queue,
684                QueueDeclareOptions {
685                    durable: true,
686                    ..Default::default()
687                },
688                FieldTable::default(),
689            )
690            .await?;
691
692        // Serialize envelope
693        let payload = serde_json::to_vec(envelope)
694            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
695
696        // Build properties
697        let options = options.unwrap_or_default();
698        let mut properties = BasicProperties::default()
699            .with_content_type("application/json".into())
700            .with_delivery_mode(2)
701            .with_headers(FieldTable::default());
702
703        if let Some(expiration) = options.expiration {
704            properties = properties.with_expiration(expiration.into());
705        }
706
707        if let Some(priority) = options.priority {
708            properties = properties.with_priority(priority);
709        }
710
711        let confirm = channel
712            .basic_publish(
713                "",
714                queue,
715                BasicPublishOptions {
716                    mandatory: options.mandatory,
717                    immediate: options.immediate,
718                },
719                &payload,
720                properties,
721            )
722            .await?;
723
724        confirm.await?;
725
726        debug!("Published MassTransit envelope to queue '{}'", queue);
727
728        Ok(())
729    }
730}