server/service_bus_manager/
manager.rs1use super::AzureAdConfig;
2use super::azure_management_client::StatisticsConfig;
3use super::command_handlers::*;
4use super::commands::ServiceBusCommand;
5use super::consumer_manager::ConsumerManager;
6use super::errors::{ServiceBusError, ServiceBusResult};
7use super::producer_manager::ProducerManager;
8use super::queue_statistics_service::QueueStatisticsService;
9use super::responses::ServiceBusResponse;
10use super::types::QueueInfo;
11use crate::bulk_operations::{BulkOperationHandler, types::BatchConfig};
12use azservicebus::{ServiceBusClient, ServiceBusClientOptions, core::BasicRetryPolicy};
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16pub struct ServiceBusManager {
62 queue_handler: QueueCommandHandler,
63 message_handler: MessageCommandHandler,
64 send_handler: SendCommandHandler,
65 status_handler: StatusCommandHandler,
66 bulk_handler: BulkCommandHandler,
67 resource_handler: ResourceCommandHandler,
68
69 consumer_manager: Arc<Mutex<ConsumerManager>>,
71 producer_manager: Arc<Mutex<ProducerManager>>,
72 service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
73
74 connection_string: String,
76
77 last_error: Arc<Mutex<Option<String>>>,
79}
80
81impl ServiceBusManager {
82 pub fn new(
101 service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
102 http_client: reqwest::Client,
103 azure_ad_config: AzureAdConfig,
104 statistics_config: StatisticsConfig,
105 batch_config: BatchConfig,
106 connection_string: String,
107 ) -> Self {
108 let consumer_manager = Arc::new(Mutex::new(ConsumerManager::new(
109 service_bus_client.clone(),
110 batch_config.clone(),
111 )));
112 let producer_manager = Arc::new(Mutex::new(ProducerManager::new(
113 service_bus_client.clone(),
114 batch_config.clone(),
115 )));
116 let bulk_handler_inner = Arc::new(BulkOperationHandler::new(batch_config.clone()));
117 let statistics_service = Arc::new(QueueStatisticsService::new(
118 http_client.clone(),
119 statistics_config,
120 azure_ad_config,
121 ));
122
123 Self {
124 queue_handler: QueueCommandHandler::new(consumer_manager.clone(), statistics_service),
125 message_handler: MessageCommandHandler::new(consumer_manager.clone()),
126 send_handler: SendCommandHandler::new(producer_manager.clone()),
127 status_handler: StatusCommandHandler::new(
128 consumer_manager.clone(),
129 producer_manager.clone(),
130 ),
131 bulk_handler: BulkCommandHandler::new(
132 bulk_handler_inner,
133 consumer_manager.clone(),
134 producer_manager.clone(),
135 batch_config.clone(),
136 ),
137 resource_handler: ResourceCommandHandler::new(
138 consumer_manager.clone(),
139 producer_manager.clone(),
140 ),
141 consumer_manager,
142 producer_manager,
143 service_bus_client,
144 connection_string,
145 last_error: Arc::new(Mutex::new(None)),
146 }
147 }
148
149 pub async fn execute_command(&self, command: ServiceBusCommand) -> ServiceBusResponse {
178 log::debug!("Executing command: {command:?}");
179
180 let result = self.handle_command(command).await;
181
182 match result {
183 Ok(response) => {
184 let mut last_error = self.last_error.lock().await;
185 *last_error = None;
186 response
187 }
188 Err(error) => {
189 let mut last_error = self.last_error.lock().await;
190 *last_error = Some(error.to_string());
191 log::error!("Command execution failed: {error}");
192 ServiceBusResponse::Error { error }
193 }
194 }
195 }
196
197 async fn handle_command(
211 &self,
212 command: ServiceBusCommand,
213 ) -> ServiceBusResult<ServiceBusResponse> {
214 match command {
215 ServiceBusCommand::SwitchQueue {
217 queue_name,
218 queue_type,
219 } => {
220 self.queue_handler
221 .handle_switch_queue(queue_name, queue_type)
222 .await
223 }
224 ServiceBusCommand::GetCurrentQueue => {
225 self.queue_handler.handle_get_current_queue().await
226 }
227 ServiceBusCommand::GetQueueStatistics {
228 queue_name,
229 queue_type,
230 } => {
231 self.queue_handler
232 .handle_get_queue_statistics(queue_name, queue_type)
233 .await
234 }
235
236 ServiceBusCommand::PeekMessages {
238 max_count,
239 from_sequence,
240 } => {
241 self.message_handler
242 .handle_peek_messages(max_count, from_sequence)
243 .await
244 }
245 ServiceBusCommand::ReceiveMessages { max_count } => {
246 self.message_handler
247 .handle_receive_messages(max_count)
248 .await
249 }
250 ServiceBusCommand::CompleteMessage { message_id } => {
251 self.message_handler
252 .handle_complete_message(message_id)
253 .await
254 }
255 ServiceBusCommand::AbandonMessage { message_id } => {
256 self.message_handler
257 .handle_abandon_message(message_id)
258 .await
259 }
260 ServiceBusCommand::DeadLetterMessage {
261 message_id,
262 reason,
263 error_description,
264 } => {
265 self.message_handler
266 .handle_dead_letter_message(message_id, reason, error_description)
267 .await
268 }
269
270 ServiceBusCommand::BulkComplete { message_ids } => {
272 self.bulk_handler.handle_bulk_complete(message_ids).await
273 }
274 ServiceBusCommand::BulkDelete {
275 message_ids,
276 max_position,
277 } => {
278 self.bulk_handler
279 .handle_bulk_delete(message_ids, max_position)
280 .await
281 }
282 ServiceBusCommand::BulkAbandon { message_ids } => {
283 self.bulk_handler.handle_bulk_abandon(message_ids).await
284 }
285 ServiceBusCommand::BulkDeadLetter {
286 message_ids,
287 reason,
288 error_description,
289 } => {
290 self.bulk_handler
291 .handle_bulk_dead_letter(message_ids, reason, error_description)
292 .await
293 }
294 ServiceBusCommand::BulkSend {
295 message_ids,
296 target_queue,
297 should_delete_source,
298 repeat_count,
299 max_position,
300 } => {
301 self.bulk_handler
302 .handle_bulk_send(
303 message_ids,
304 target_queue,
305 should_delete_source,
306 repeat_count,
307 max_position,
308 )
309 .await
310 }
311 ServiceBusCommand::BulkSendPeeked {
312 messages_data,
313 target_queue,
314 repeat_count,
315 } => {
316 self.bulk_handler
317 .handle_bulk_send_peeked(messages_data, target_queue, repeat_count)
318 .await
319 }
320
321 ServiceBusCommand::SendMessage {
323 queue_name,
324 message,
325 } => {
326 self.send_handler
327 .handle_send_message(queue_name, message)
328 .await
329 }
330 ServiceBusCommand::SendMessages {
331 queue_name,
332 messages,
333 } => {
334 self.send_handler
335 .handle_send_messages(queue_name, messages)
336 .await
337 }
338
339 ServiceBusCommand::GetConnectionStatus => {
341 self.status_handler.handle_get_connection_status().await
342 }
343 ServiceBusCommand::GetQueueStats { queue_name } => {
344 self.status_handler.handle_get_queue_stats(queue_name).await
345 }
346
347 ServiceBusCommand::DisposeConsumer => {
349 self.resource_handler.handle_dispose_consumer().await
350 }
351 ServiceBusCommand::DisposeAllResources => {
352 self.resource_handler.handle_dispose_all_resources().await
353 }
354 ServiceBusCommand::ResetConnection => self.handle_reset_connection().await,
355 }
356 }
357
358 pub async fn get_azure_ad_token(
360 config: &AzureAdConfig,
361 ) -> Result<String, Box<dyn std::error::Error>> {
362 let http_client = reqwest::Client::new();
364 config.get_azure_ad_token(&http_client).await
365 }
366
367 pub async fn list_queues_azure_ad(
368 config: &AzureAdConfig,
369 ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
370 let http_client = reqwest::Client::new();
372 config.list_queues_azure_ad(&http_client).await
373 }
374
375 pub async fn list_namespaces_azure_ad(
376 config: &AzureAdConfig,
377 ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
378 let http_client = reqwest::Client::new();
380 config.list_namespaces_azure_ad(&http_client).await
381 }
382
383 pub async fn get_current_queue(&self) -> Option<QueueInfo> {
385 let consumer = self.consumer_manager.lock().await;
386 consumer.current_queue().cloned()
387 }
388
389 pub async fn is_connected(&self) -> bool {
390 let consumer = self.consumer_manager.lock().await;
391 let producer = self.producer_manager.lock().await;
392 consumer.is_consumer_ready() || producer.producer_count() > 0
393 }
394
395 pub async fn get_producer_count(&self) -> usize {
396 let producer = self.producer_manager.lock().await;
397 producer.producer_count()
398 }
399
400 pub async fn get_last_error(&self) -> Option<String> {
401 let last_error = self.last_error.lock().await;
402 last_error.clone()
403 }
404
405 pub async fn handle_reset_connection(&self) -> ServiceBusResult<ServiceBusResponse> {
407 log::info!("Resetting ServiceBus connection completely");
408
409 let _ = self.resource_handler.handle_dispose_all_resources().await;
411
412 let new_client = ServiceBusClient::new_from_connection_string(
414 &self.connection_string,
415 ServiceBusClientOptions::default(),
416 )
417 .await
418 .map_err(|e| {
419 ServiceBusError::ConnectionFailed(format!(
420 "Failed to create new ServiceBus client: {e}"
421 ))
422 })?;
423
424 {
426 let mut client_guard = self.service_bus_client.lock().await;
427 *client_guard = new_client;
428 }
429
430 {
432 let mut consumer_manager = self.consumer_manager.lock().await;
433 consumer_manager
434 .reset_client(self.service_bus_client.clone())
435 .await?;
436 }
437
438 {
439 let mut producer_manager = self.producer_manager.lock().await;
440 producer_manager
441 .reset_client(self.service_bus_client.clone())
442 .await?;
443 }
444
445 log::info!("ServiceBus connection reset successfully");
446 Ok(ServiceBusResponse::ConnectionReset)
447 }
448}