server/producer.rs
1use azservicebus::{
2 ServiceBusClient, ServiceBusMessage, ServiceBusSender, ServiceBusSenderOptions,
3};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7/// A wrapper around Azure Service Bus sender for producing messages to queues.
8///
9/// The Producer provides a high-level interface for sending messages to Azure Service Bus queues.
10/// It supports both single message sending and batch operations for improved performance.
11///
12/// # Thread Safety
13///
14/// The Producer is thread-safe and can be shared across async tasks. The underlying
15/// sender is protected by a mutex to ensure safe concurrent access.
16///
17/// # Examples
18///
19/// ```no_run
20/// use quetty_server::producer::Producer;
21/// use azservicebus::{ServiceBusSender, ServiceBusMessage};
22///
23/// async fn example(sender: ServiceBusSender) -> Result<(), Box<dyn std::error::Error>> {
24/// let mut producer = Producer::new(sender);
25///
26/// // Send a single text message
27/// let message = Producer::create_text_message("Hello, world!");
28/// producer.send_message(message).await?;
29///
30/// // Send multiple messages in a batch
31/// let messages = vec![
32/// Producer::create_text_message("Message 1"),
33/// Producer::create_text_message("Message 2"),
34/// ];
35/// producer.send_messages(messages).await?;
36///
37/// Ok(())
38/// }
39/// ```
40#[derive(Debug)]
41pub struct Producer {
42 sender: Arc<Mutex<Option<ServiceBusSender>>>,
43}
44
45impl PartialEq for Producer {
46 fn eq(&self, other: &Self) -> bool {
47 Arc::ptr_eq(&self.sender, &other.sender)
48 }
49}
50
51impl Producer {
52 /// Creates a new Producer wrapping the provided Service Bus sender.
53 ///
54 /// # Arguments
55 ///
56 /// * `sender` - The Azure Service Bus sender to wrap
57 pub fn new(sender: ServiceBusSender) -> Self {
58 Self {
59 sender: Arc::new(Mutex::new(Some(sender))),
60 }
61 }
62
63 /// Sends a single message to the queue.
64 ///
65 /// # Arguments
66 ///
67 /// * `message` - The ServiceBusMessage to send
68 ///
69 /// # Errors
70 ///
71 /// Returns an error if the sender has been disposed or if the Service Bus operation fails
72 pub async fn send_message(
73 &mut self,
74 message: ServiceBusMessage,
75 ) -> Result<(), Box<dyn std::error::Error>> {
76 let mut guard = self.sender.lock().await;
77 if let Some(sender) = guard.as_mut() {
78 sender.send_message(message).await?;
79 Ok(())
80 } else {
81 Err("Sender already disposed".into())
82 }
83 }
84
85 /// Sends multiple messages to the queue in a batch operation.
86 ///
87 /// Batch sending is more efficient than sending individual messages
88 /// when you need to send multiple messages at once.
89 ///
90 /// # Arguments
91 ///
92 /// * `messages` - Vector of ServiceBusMessage instances to send
93 ///
94 /// # Errors
95 ///
96 /// Returns an error if the sender has been disposed or if the Service Bus operation fails
97 pub async fn send_messages(
98 &mut self,
99 messages: Vec<ServiceBusMessage>,
100 ) -> Result<(), Box<dyn std::error::Error>> {
101 let mut guard = self.sender.lock().await;
102 if let Some(sender) = guard.as_mut() {
103 sender.send_messages(messages).await?;
104 Ok(())
105 } else {
106 Err("Sender already disposed".into())
107 }
108 }
109
110 /// Creates a new message with the given byte array body.
111 ///
112 /// # Arguments
113 ///
114 /// * `body` - The message body as a byte vector
115 ///
116 /// # Returns
117 ///
118 /// A ServiceBusMessage with the specified body
119 pub fn create_message(body: Vec<u8>) -> ServiceBusMessage {
120 ServiceBusMessage::new(body)
121 }
122
123 /// Creates a new message with a string body.
124 ///
125 /// # Arguments
126 ///
127 /// * `text` - The message text content
128 ///
129 /// # Returns
130 ///
131 /// A ServiceBusMessage with the text as the body
132 pub fn create_text_message(text: &str) -> ServiceBusMessage {
133 ServiceBusMessage::new(text.as_bytes().to_vec())
134 }
135
136 /// Creates a new message with a JSON-serialized body.
137 ///
138 /// # Arguments
139 ///
140 /// * `data` - The data to serialize as JSON
141 ///
142 /// # Returns
143 ///
144 /// A ServiceBusMessage with the JSON data as the body
145 ///
146 /// # Errors
147 ///
148 /// Returns an error if the data cannot be serialized to JSON
149 pub fn create_json_message<T: serde::Serialize>(
150 data: &T,
151 ) -> Result<ServiceBusMessage, Box<dyn std::error::Error>> {
152 let json_bytes = serde_json::to_vec(data)?;
153 let message = ServiceBusMessage::new(json_bytes);
154 // Set content type to indicate JSON
155 // Note: This depends on the azservicebus API - may need adjustment
156 Ok(message)
157 }
158
159 /// Disposes the underlying Service Bus sender, releasing all resources.
160 ///
161 /// After disposal, all other operations on this Producer will fail.
162 ///
163 /// # Errors
164 ///
165 /// Returns an error if the disposal operation fails
166 pub async fn dispose(&self) -> Result<(), Box<dyn std::error::Error>> {
167 let mut guard = self.sender.lock().await;
168 if let Some(sender) = guard.take() {
169 sender.dispose().await?;
170 }
171 Ok(())
172 }
173}
174
175/// Extension trait for ServiceBusClient to create Producer instances.
176///
177/// This trait provides a convenient method to create a Producer directly
178/// from a ServiceBusClient without manually creating the sender first.
179pub trait ServiceBusClientProducerExt {
180 /// Creates a Producer for the specified queue.
181 ///
182 /// # Arguments
183 ///
184 /// * `queue_name` - Name of the queue to create a producer for
185 /// * `options` - Configuration options for the sender
186 ///
187 /// # Returns
188 ///
189 /// A Producer instance configured for the specified queue
190 ///
191 /// # Errors
192 ///
193 /// Returns an error if the sender creation fails
194 fn create_producer_for_queue(
195 &mut self,
196 queue_name: impl Into<String> + Send,
197 options: ServiceBusSenderOptions,
198 ) -> impl std::future::Future<Output = Result<Producer, azure_core::Error>>;
199}
200
201impl<RP> ServiceBusClientProducerExt for ServiceBusClient<RP>
202where
203 RP: azservicebus::ServiceBusRetryPolicy
204 + From<azservicebus::ServiceBusRetryOptions>
205 + Send
206 + Sync
207 + 'static,
208{
209 /// Creates a Producer for the specified queue using this ServiceBusClient.
210 ///
211 /// This method handles the creation of the underlying sender and wraps it
212 /// in a Producer instance for easier usage.
213 async fn create_producer_for_queue(
214 &mut self,
215 queue_name: impl Into<String> + Send,
216 options: ServiceBusSenderOptions,
217 ) -> Result<Producer, azure_core::Error> {
218 let sender = self.create_sender(queue_name, options).await.map_err(|e| {
219 azure_core::Error::message(
220 azure_core::error::ErrorKind::Other,
221 format!("Sender error: {e}"),
222 )
223 })?;
224
225 Ok(Producer::new(sender))
226 }
227}