rust_rabbit/
publisher.rs

1use crate::{connection::Connection, error::RustRabbitError};
2use lapin::{
3    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueDeclareOptions},
4    types::FieldTable,
5    BasicProperties, Channel, ExchangeKind,
6};
7use serde::Serialize;
8use std::sync::Arc;
9use tracing::debug;
10
11/// Publish options builder
12#[derive(Debug, Clone, Default)]
13pub struct PublishOptions {
14    pub mandatory: bool,
15    pub immediate: bool,
16    pub expiration: Option<String>,
17    pub priority: Option<u8>,
18}
19
20impl PublishOptions {
21    pub fn new() -> Self {
22        Self::default()
23    }
24
25    pub fn mandatory(mut self) -> Self {
26        self.mandatory = true;
27        self
28    }
29
30    pub fn with_expiration(mut self, expiration: impl Into<String>) -> Self {
31        self.expiration = Some(expiration.into());
32        self
33    }
34
35    pub fn with_priority(mut self, priority: u8) -> Self {
36        self.priority = Some(priority);
37        self
38    }
39}
40
41/// Simplified Publisher for message publishing
42pub struct Publisher {
43    connection: Arc<Connection>,
44}
45
46impl Publisher {
47    /// Create a new publisher
48    pub fn new(connection: Arc<Connection>) -> Self {
49        Self { connection }
50    }
51
52    /// Publish message to an exchange
53    pub async fn publish_to_exchange<T>(
54        &self,
55        exchange: &str,
56        routing_key: &str,
57        message: &T,
58        options: Option<PublishOptions>,
59    ) -> Result<(), RustRabbitError>
60    where
61        T: Serialize,
62    {
63        let channel = self.connection.create_channel().await?;
64
65        // Declare exchange (simplified - always topic for flexibility)
66        channel
67            .exchange_declare(
68                exchange,
69                ExchangeKind::Topic,
70                ExchangeDeclareOptions {
71                    durable: true,
72                    ..Default::default()
73                },
74                FieldTable::default(),
75            )
76            .await?;
77
78        self.publish_message(&channel, exchange, routing_key, message, options)
79            .await
80    }
81
82    /// Publish message directly to a queue
83    pub async fn publish_to_queue<T>(
84        &self,
85        queue: &str,
86        message: &T,
87        options: Option<PublishOptions>,
88    ) -> Result<(), RustRabbitError>
89    where
90        T: Serialize,
91    {
92        let channel = self.connection.create_channel().await?;
93
94        // Declare queue
95        channel
96            .queue_declare(
97                queue,
98                QueueDeclareOptions {
99                    durable: true,
100                    ..Default::default()
101                },
102                FieldTable::default(),
103            )
104            .await?;
105
106        // Publish to default exchange with queue name as routing key
107        self.publish_message(&channel, "", queue, message, options)
108            .await
109    }
110
111    /// Internal method to publish message
112    async fn publish_message<T>(
113        &self,
114        channel: &Channel,
115        exchange: &str,
116        routing_key: &str,
117        message: &T,
118        options: Option<PublishOptions>,
119    ) -> Result<(), RustRabbitError>
120    where
121        T: Serialize,
122    {
123        // Serialize message
124        let payload = serde_json::to_vec(message)
125            .map_err(|e| RustRabbitError::Serialization(e.to_string()))?;
126
127        // Build properties
128        let options = options.unwrap_or_default();
129        let mut properties = BasicProperties::default()
130            .with_content_type("application/json".into())
131            .with_delivery_mode(2); // Persistent
132
133        if let Some(expiration) = options.expiration {
134            properties = properties.with_expiration(expiration.into());
135        }
136
137        if let Some(priority) = options.priority {
138            properties = properties.with_priority(priority);
139        }
140
141        // Publish message
142        let confirm = channel
143            .basic_publish(
144                exchange,
145                routing_key,
146                BasicPublishOptions {
147                    mandatory: options.mandatory,
148                    immediate: options.immediate,
149                },
150                &payload,
151                properties,
152            )
153            .await?;
154
155        // Wait for confirmation (simplified)
156        confirm.await?;
157
158        debug!(
159            "Published message to exchange '{}' with routing key '{}'",
160            exchange, routing_key
161        );
162
163        Ok(())
164    }
165}