server/
producer.rs

1use azservicebus::{
2    ServiceBusClient, ServiceBusMessage, ServiceBusSender, ServiceBusSenderOptions,
3};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7/// A wrapper around Azure Service Bus sender for producing messages to queues.
8///
9/// The Producer provides a high-level interface for sending messages to Azure Service Bus queues.
10/// It supports both single message sending and batch operations for improved performance.
11///
12/// # Thread Safety
13///
14/// The Producer is thread-safe and can be shared across async tasks. The underlying
15/// sender is protected by a mutex to ensure safe concurrent access.
16///
17/// # Examples
18///
19/// ```no_run
20/// use quetty_server::producer::Producer;
21/// use azservicebus::{ServiceBusSender, ServiceBusMessage};
22///
23/// async fn example(sender: ServiceBusSender) -> Result<(), Box<dyn std::error::Error>> {
24///     let mut producer = Producer::new(sender);
25///
26///     // Send a single text message
27///     let message = Producer::create_text_message("Hello, world!");
28///     producer.send_message(message).await?;
29///
30///     // Send multiple messages in a batch
31///     let messages = vec![
32///         Producer::create_text_message("Message 1"),
33///         Producer::create_text_message("Message 2"),
34///     ];
35///     producer.send_messages(messages).await?;
36///
37///     Ok(())
38/// }
39/// ```
40#[derive(Debug)]
41pub struct Producer {
42    sender: Arc<Mutex<Option<ServiceBusSender>>>,
43}
44
45impl PartialEq for Producer {
46    fn eq(&self, other: &Self) -> bool {
47        Arc::ptr_eq(&self.sender, &other.sender)
48    }
49}
50
51impl Producer {
52    /// Creates a new Producer wrapping the provided Service Bus sender.
53    ///
54    /// # Arguments
55    ///
56    /// * `sender` - The Azure Service Bus sender to wrap
57    pub fn new(sender: ServiceBusSender) -> Self {
58        Self {
59            sender: Arc::new(Mutex::new(Some(sender))),
60        }
61    }
62
63    /// Sends a single message to the queue.
64    ///
65    /// # Arguments
66    ///
67    /// * `message` - The ServiceBusMessage to send
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the sender has been disposed or if the Service Bus operation fails
72    pub async fn send_message(
73        &mut self,
74        message: ServiceBusMessage,
75    ) -> Result<(), Box<dyn std::error::Error>> {
76        let mut guard = self.sender.lock().await;
77        if let Some(sender) = guard.as_mut() {
78            sender.send_message(message).await?;
79            Ok(())
80        } else {
81            Err("Sender already disposed".into())
82        }
83    }
84
85    /// Sends multiple messages to the queue in a batch operation.
86    ///
87    /// Batch sending is more efficient than sending individual messages
88    /// when you need to send multiple messages at once.
89    ///
90    /// # Arguments
91    ///
92    /// * `messages` - Vector of ServiceBusMessage instances to send
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the sender has been disposed or if the Service Bus operation fails
97    pub async fn send_messages(
98        &mut self,
99        messages: Vec<ServiceBusMessage>,
100    ) -> Result<(), Box<dyn std::error::Error>> {
101        let mut guard = self.sender.lock().await;
102        if let Some(sender) = guard.as_mut() {
103            sender.send_messages(messages).await?;
104            Ok(())
105        } else {
106            Err("Sender already disposed".into())
107        }
108    }
109
110    /// Creates a new message with the given byte array body.
111    ///
112    /// # Arguments
113    ///
114    /// * `body` - The message body as a byte vector
115    ///
116    /// # Returns
117    ///
118    /// A ServiceBusMessage with the specified body
119    pub fn create_message(body: Vec<u8>) -> ServiceBusMessage {
120        ServiceBusMessage::new(body)
121    }
122
123    /// Creates a new message with a string body.
124    ///
125    /// # Arguments
126    ///
127    /// * `text` - The message text content
128    ///
129    /// # Returns
130    ///
131    /// A ServiceBusMessage with the text as the body
132    pub fn create_text_message(text: &str) -> ServiceBusMessage {
133        ServiceBusMessage::new(text.as_bytes().to_vec())
134    }
135
136    /// Creates a new message with a JSON-serialized body.
137    ///
138    /// # Arguments
139    ///
140    /// * `data` - The data to serialize as JSON
141    ///
142    /// # Returns
143    ///
144    /// A ServiceBusMessage with the JSON data as the body
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the data cannot be serialized to JSON
149    pub fn create_json_message<T: serde::Serialize>(
150        data: &T,
151    ) -> Result<ServiceBusMessage, Box<dyn std::error::Error>> {
152        let json_bytes = serde_json::to_vec(data)?;
153        let message = ServiceBusMessage::new(json_bytes);
154        // Set content type to indicate JSON
155        // Note: This depends on the azservicebus API - may need adjustment
156        Ok(message)
157    }
158
159    /// Disposes the underlying Service Bus sender, releasing all resources.
160    ///
161    /// After disposal, all other operations on this Producer will fail.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the disposal operation fails
166    pub async fn dispose(&self) -> Result<(), Box<dyn std::error::Error>> {
167        let mut guard = self.sender.lock().await;
168        if let Some(sender) = guard.take() {
169            sender.dispose().await?;
170        }
171        Ok(())
172    }
173}
174
175/// Extension trait for ServiceBusClient to create Producer instances.
176///
177/// This trait provides a convenient method to create a Producer directly
178/// from a ServiceBusClient without manually creating the sender first.
179pub trait ServiceBusClientProducerExt {
180    /// Creates a Producer for the specified queue.
181    ///
182    /// # Arguments
183    ///
184    /// * `queue_name` - Name of the queue to create a producer for
185    /// * `options` - Configuration options for the sender
186    ///
187    /// # Returns
188    ///
189    /// A Producer instance configured for the specified queue
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the sender creation fails
194    fn create_producer_for_queue(
195        &mut self,
196        queue_name: impl Into<String> + Send,
197        options: ServiceBusSenderOptions,
198    ) -> impl std::future::Future<Output = Result<Producer, azure_core::Error>>;
199}
200
201impl<RP> ServiceBusClientProducerExt for ServiceBusClient<RP>
202where
203    RP: azservicebus::ServiceBusRetryPolicy
204        + From<azservicebus::ServiceBusRetryOptions>
205        + Send
206        + Sync
207        + 'static,
208{
209    /// Creates a Producer for the specified queue using this ServiceBusClient.
210    ///
211    /// This method handles the creation of the underlying sender and wraps it
212    /// in a Producer instance for easier usage.
213    async fn create_producer_for_queue(
214        &mut self,
215        queue_name: impl Into<String> + Send,
216        options: ServiceBusSenderOptions,
217    ) -> Result<Producer, azure_core::Error> {
218        let sender = self.create_sender(queue_name, options).await.map_err(|e| {
219            azure_core::Error::message(
220                azure_core::error::ErrorKind::Other,
221                format!("Sender error: {e}"),
222            )
223        })?;
224
225        Ok(Producer::new(sender))
226    }
227}