server/service_bus_manager/
consumer_manager.rs1use super::errors::{ServiceBusError, ServiceBusResult};
2use super::types::QueueInfo;
3use crate::bulk_operations::types::BatchConfig;
4use crate::consumer::{Consumer, ServiceBusClientExt};
5use crate::model::MessageModel;
6use azservicebus::{ServiceBusClient, ServiceBusReceiverOptions, core::BasicRetryPolicy};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::Mutex;
10
11pub struct ConsumerManager {
12 current_consumer: Option<Arc<Mutex<Consumer>>>,
13 current_queue: Option<QueueInfo>,
14 service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
15 batch_config: BatchConfig,
16}
17
18impl ConsumerManager {
19 pub fn new(
20 service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
21 batch_config: BatchConfig,
22 ) -> Self {
23 Self {
24 current_consumer: None,
25 current_queue: None,
26 service_bus_client,
27 batch_config,
28 }
29 }
30
31 pub async fn switch_queue(&mut self, queue_info: QueueInfo) -> ServiceBusResult<()> {
33 log::info!(
34 "Switching to queue: {} (type: {:?})",
35 queue_info.name,
36 queue_info.queue_type
37 );
38
39 if let Some(current_queue) = &self.current_queue {
41 if current_queue.name == queue_info.name
42 && current_queue.queue_type == queue_info.queue_type
43 {
44 log::debug!("Already connected to queue: {}", queue_info.name);
45 return Ok(());
46 }
47 }
48
49 if let Some(consumer) = &self.current_consumer {
51 log::debug!("Disposing existing consumer");
52 if let Err(e) = consumer.lock().await.dispose().await {
53 log::error!("Failed to dispose existing consumer: {e}");
54 }
56 }
57
58 log::debug!("Creating new consumer for queue: {}", queue_info.name);
60 let mut client = self.service_bus_client.lock().await;
61 let consumer = client
62 .create_consumer_for_queue(
63 queue_info.name.clone(),
64 ServiceBusReceiverOptions::default(),
65 )
66 .await
67 .map_err(|e| {
68 ServiceBusError::ConsumerCreationFailed(format!(
69 "Failed to create consumer for queue {}: {}",
70 queue_info.name, e
71 ))
72 })?;
73
74 self.current_consumer = Some(Arc::new(Mutex::new(consumer)));
76 self.current_queue = Some(queue_info);
77
78 if let Some(queue) = self.current_queue.as_ref() {
79 log::info!("Successfully switched to queue: {}", queue.name);
80 }
81 Ok(())
82 }
83
84 pub fn current_queue(&self) -> Option<&QueueInfo> {
86 self.current_queue.as_ref()
87 }
88
89 pub fn is_consumer_ready(&self) -> bool {
91 self.current_consumer.is_some() && self.current_queue.is_some()
92 }
93
94 pub async fn peek_messages(
96 &self,
97 max_count: u32,
98 from_sequence: Option<i64>,
99 ) -> ServiceBusResult<Vec<MessageModel>> {
100 let consumer = self.get_consumer()?;
101 let mut consumer_guard = consumer.lock().await;
102
103 consumer_guard
104 .peek_messages(max_count, from_sequence)
105 .await
106 .map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))
107 }
108
109 pub async fn receive_messages(
111 &self,
112 max_count: u32,
113 ) -> ServiceBusResult<Vec<azservicebus::ServiceBusReceivedMessage>> {
114 let consumer = self.get_consumer()?;
115 let mut consumer_guard = consumer.lock().await;
116
117 let timeout = Duration::from_secs(self.batch_config.receive_timeout_secs());
119 consumer_guard
120 .receive_messages_with_timeout(max_count, timeout)
121 .await
122 .map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))
123 }
124
125 pub async fn complete_message(
127 &self,
128 message: &azservicebus::ServiceBusReceivedMessage,
129 ) -> ServiceBusResult<()> {
130 let consumer = self.get_consumer()?;
131 let mut consumer_guard = consumer.lock().await;
132
133 consumer_guard
134 .complete_message(message)
135 .await
136 .map_err(|e| ServiceBusError::MessageCompleteFailed(e.to_string()))
137 }
138
139 pub async fn complete_messages(
141 &self,
142 messages: &[azservicebus::ServiceBusReceivedMessage],
143 ) -> ServiceBusResult<()> {
144 let consumer = self.get_consumer()?;
145 let mut consumer_guard = consumer.lock().await;
146
147 consumer_guard
148 .complete_messages(messages)
149 .await
150 .map_err(|e| ServiceBusError::MessageCompleteFailed(e.to_string()))
151 }
152
153 pub async fn abandon_message(
155 &self,
156 message: &azservicebus::ServiceBusReceivedMessage,
157 ) -> ServiceBusResult<()> {
158 let consumer = self.get_consumer()?;
159 let mut consumer_guard = consumer.lock().await;
160
161 consumer_guard
162 .abandon_message(message)
163 .await
164 .map_err(|e| ServiceBusError::MessageAbandonFailed(e.to_string()))
165 }
166
167 pub async fn abandon_messages(
169 &self,
170 messages: &[azservicebus::ServiceBusReceivedMessage],
171 ) -> ServiceBusResult<()> {
172 let consumer = self.get_consumer()?;
173 let mut consumer_guard = consumer.lock().await;
174
175 consumer_guard
176 .abandon_messages(messages)
177 .await
178 .map_err(|e| ServiceBusError::MessageAbandonFailed(e.to_string()))
179 }
180
181 pub async fn dead_letter_message(
183 &self,
184 message: &azservicebus::ServiceBusReceivedMessage,
185 reason: Option<String>,
186 error_description: Option<String>,
187 ) -> ServiceBusResult<()> {
188 let consumer = self.get_consumer()?;
189 let mut consumer_guard = consumer.lock().await;
190
191 consumer_guard
192 .dead_letter_message(message, reason, error_description)
193 .await
194 .map_err(|e| ServiceBusError::MessageDeadLetterFailed(e.to_string()))
195 }
196
197 pub async fn find_message(
199 &self,
200 message_id: &str,
201 sequence_number: i64,
202 max_position: Option<usize>,
203 ) -> ServiceBusResult<Option<azservicebus::ServiceBusReceivedMessage>> {
204 let consumer = self.get_consumer()?;
205 let batch_size = self.batch_config.bulk_chunk_size() as u32;
206 let timeout = Duration::from_secs(self.batch_config.bulk_processing_time_secs());
207 let max_position = max_position.unwrap_or(self.batch_config.max_messages_to_process());
208
209 log::info!(
210 "Searching for message {message_id} (sequence: {sequence_number}) in batches of {batch_size} up to position {max_position}"
211 );
212
213 let mut processed_count = 0;
214
215 while processed_count < max_position {
216 log::debug!(
217 "Search batch: fetching {batch_size} messages (processed: {processed_count}/{max_position})"
218 );
219
220 let messages = {
221 let mut consumer_guard = consumer.lock().await;
222 consumer_guard
223 .receive_messages_with_timeout(batch_size, timeout)
224 .await
225 .map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))?
226 };
227
228 if messages.is_empty() {
229 log::debug!("No more messages available - stopping search");
230 break;
231 }
232
233 processed_count += messages.len();
234
235 for message in messages.into_iter() {
237 let msg_id = message.message_id().unwrap_or_default();
238 let msg_seq = message.sequence_number();
239
240 if msg_id == message_id && msg_seq == sequence_number {
241 log::info!(
242 "Found target message {message_id} (sequence: {sequence_number}) after processing {processed_count} messages"
243 );
244 return Ok(Some(message));
245 }
246
247 let mut consumer_guard = consumer.lock().await;
249 if let Err(e) = consumer_guard.abandon_message(&message).await {
250 log::warn!("Failed to abandon non-target message: {e}");
251 }
252 }
253 }
254
255 log::info!(
256 "Message {message_id} (sequence: {sequence_number}) not found after processing {processed_count} messages (max: {max_position})"
257 );
258 Ok(None)
259 }
260
261 pub async fn dispose_consumer(&mut self) -> ServiceBusResult<()> {
263 if let Some(consumer) = self.current_consumer.take() {
264 log::info!("Disposing consumer for queue: {:?}", self.current_queue);
265 consumer.lock().await.dispose().await.map_err(|e| {
266 ServiceBusError::InternalError(format!("Failed to dispose consumer: {e}"))
267 })?;
268 }
269 self.current_queue = None;
270 Ok(())
271 }
272
273 fn get_consumer(&self) -> ServiceBusResult<Arc<Mutex<Consumer>>> {
275 self.current_consumer
276 .clone()
277 .ok_or(ServiceBusError::ConsumerNotFound)
278 }
279
280 pub fn get_raw_consumer(&self) -> Option<Arc<Mutex<Consumer>>> {
282 self.current_consumer.clone()
283 }
284
285 pub async fn reset_client(
287 &mut self,
288 new_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
289 ) -> ServiceBusResult<()> {
290 log::info!("Resetting ServiceBusClient reference in ConsumerManager");
291
292 self.dispose_consumer().await?;
294
295 self.service_bus_client = new_client;
297
298 log::info!("ConsumerManager client reference updated successfully");
299 Ok(())
300 }
301}