server/service_bus_manager/
manager.rs

1use super::AzureAdConfig;
2use super::azure_management_client::StatisticsConfig;
3use super::command_handlers::*;
4use super::commands::ServiceBusCommand;
5use super::consumer_manager::ConsumerManager;
6use super::errors::{ServiceBusError, ServiceBusResult};
7use super::producer_manager::ProducerManager;
8use super::queue_statistics_service::QueueStatisticsService;
9use super::responses::ServiceBusResponse;
10use super::types::QueueInfo;
11use crate::bulk_operations::{BulkOperationHandler, types::BatchConfig};
12use azservicebus::{ServiceBusClient, ServiceBusClientOptions, core::BasicRetryPolicy};
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16/// The main service bus manager that orchestrates all service bus operations.
17///
18/// ServiceBusManager is the central coordinator for all Azure Service Bus operations,
19/// providing a unified interface for queue management, message handling, bulk operations,
20/// and resource management. It uses a command pattern to process operations through
21/// specialized handlers.
22///
23/// # Architecture
24///
25/// The manager is built around specialized command handlers:
26/// - [`QueueCommandHandler`] - Queue switching and statistics
27/// - [`MessageCommandHandler`] - Message retrieval and completion
28/// - [`SendCommandHandler`] - Message sending operations
29/// - [`StatusCommandHandler`] - Connection and status monitoring
30/// - [`BulkCommandHandler`] - Bulk message operations
31/// - [`ResourceCommandHandler`] - Resource discovery and management
32///
33/// # Thread Safety
34///
35/// All internal state is protected by async mutexes, making the manager safe to use
36/// across multiple async tasks and threads.
37///
38/// # Examples
39///
40/// ```no_run
41/// use quetty_server::service_bus_manager::{ServiceBusManager, ServiceBusCommand};
42///
43/// let manager = ServiceBusManager::new(/* configuration */);
44///
45/// // Switch to a queue
46/// let response = manager.execute_command(
47///     ServiceBusCommand::SwitchQueue {
48///         queue_name: "my-queue".to_string(),
49///         queue_type: "Queue".to_string(),
50///     }
51/// ).await;
52///
53/// // Get messages
54/// let response = manager.execute_command(
55///     ServiceBusCommand::PeekMessages {
56///         max_count: 10,
57///         from_sequence: None,
58///     }
59/// ).await;
60/// ```
61pub struct ServiceBusManager {
62    queue_handler: QueueCommandHandler,
63    message_handler: MessageCommandHandler,
64    send_handler: SendCommandHandler,
65    status_handler: StatusCommandHandler,
66    bulk_handler: BulkCommandHandler,
67    resource_handler: ResourceCommandHandler,
68
69    // Shared state
70    consumer_manager: Arc<Mutex<ConsumerManager>>,
71    producer_manager: Arc<Mutex<ProducerManager>>,
72    service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
73
74    // Connection reset capability
75    connection_string: String,
76
77    // Error tracking
78    last_error: Arc<Mutex<Option<String>>>,
79}
80
81impl ServiceBusManager {
82    /// Creates a new ServiceBusManager with the specified configuration.
83    ///
84    /// Initializes all command handlers and shared state components needed for
85    /// Service Bus operations. The manager takes ownership of the provided clients
86    /// and configuration.
87    ///
88    /// # Arguments
89    ///
90    /// * `service_bus_client` - Azure Service Bus client for operations
91    /// * `http_client` - HTTP client for Azure Management API calls
92    /// * `azure_ad_config` - Azure AD configuration for authentication
93    /// * `statistics_config` - Configuration for queue statistics collection
94    /// * `batch_config` - Configuration for bulk operations
95    /// * `connection_string` - Service Bus connection string for reconnection
96    ///
97    /// # Returns
98    ///
99    /// A fully configured ServiceBusManager ready for operations
100    pub fn new(
101        service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
102        http_client: reqwest::Client,
103        azure_ad_config: AzureAdConfig,
104        statistics_config: StatisticsConfig,
105        batch_config: BatchConfig,
106        connection_string: String,
107    ) -> Self {
108        let consumer_manager = Arc::new(Mutex::new(ConsumerManager::new(
109            service_bus_client.clone(),
110            batch_config.clone(),
111        )));
112        let producer_manager = Arc::new(Mutex::new(ProducerManager::new(
113            service_bus_client.clone(),
114            batch_config.clone(),
115        )));
116        let bulk_handler_inner = Arc::new(BulkOperationHandler::new(batch_config.clone()));
117        let statistics_service = Arc::new(QueueStatisticsService::new(
118            http_client.clone(),
119            statistics_config,
120            azure_ad_config,
121        ));
122
123        Self {
124            queue_handler: QueueCommandHandler::new(consumer_manager.clone(), statistics_service),
125            message_handler: MessageCommandHandler::new(consumer_manager.clone()),
126            send_handler: SendCommandHandler::new(producer_manager.clone()),
127            status_handler: StatusCommandHandler::new(
128                consumer_manager.clone(),
129                producer_manager.clone(),
130            ),
131            bulk_handler: BulkCommandHandler::new(
132                bulk_handler_inner,
133                consumer_manager.clone(),
134                producer_manager.clone(),
135                batch_config.clone(),
136            ),
137            resource_handler: ResourceCommandHandler::new(
138                consumer_manager.clone(),
139                producer_manager.clone(),
140            ),
141            consumer_manager,
142            producer_manager,
143            service_bus_client,
144            connection_string,
145            last_error: Arc::new(Mutex::new(None)),
146        }
147    }
148
149    /// Executes a service bus command and returns the response.
150    ///
151    /// This is the main entry point for all Service Bus operations. Commands are
152    /// dispatched to specialized handlers based on their type. Errors are caught
153    /// and converted to error responses.
154    ///
155    /// # Arguments
156    ///
157    /// * `command` - The service bus command to execute
158    ///
159    /// # Returns
160    ///
161    /// A [`ServiceBusResponse`] containing either the operation result or an error
162    ///
163    /// # Examples
164    ///
165    /// ```no_run
166    /// use quetty_server::service_bus_manager::{ServiceBusManager, ServiceBusCommand};
167    ///
168    /// let manager = ServiceBusManager::new(/* args */);
169    ///
170    /// let response = manager.execute_command(
171    ///     ServiceBusCommand::GetQueueStatistics {
172    ///         queue_name: "my-queue".to_string(),
173    ///         queue_type: "Queue".to_string(),
174    ///     }
175    /// ).await;
176    /// ```
177    pub async fn execute_command(&self, command: ServiceBusCommand) -> ServiceBusResponse {
178        log::debug!("Executing command: {command:?}");
179
180        let result = self.handle_command(command).await;
181
182        match result {
183            Ok(response) => {
184                let mut last_error = self.last_error.lock().await;
185                *last_error = None;
186                response
187            }
188            Err(error) => {
189                let mut last_error = self.last_error.lock().await;
190                *last_error = Some(error.to_string());
191                log::error!("Command execution failed: {error}");
192                ServiceBusResponse::Error { error }
193            }
194        }
195    }
196
197    /// Handles a command using specialized command handlers.
198    ///
199    /// Internal method that dispatches commands to the appropriate handler based
200    /// on command type. This method can return errors which are caught and
201    /// converted to error responses by [`execute_command`].
202    ///
203    /// # Arguments
204    ///
205    /// * `command` - The service bus command to handle
206    ///
207    /// # Returns
208    ///
209    /// A [`ServiceBusResult`] containing either the response or an error
210    async fn handle_command(
211        &self,
212        command: ServiceBusCommand,
213    ) -> ServiceBusResult<ServiceBusResponse> {
214        match command {
215            // Queue management commands
216            ServiceBusCommand::SwitchQueue {
217                queue_name,
218                queue_type,
219            } => {
220                self.queue_handler
221                    .handle_switch_queue(queue_name, queue_type)
222                    .await
223            }
224            ServiceBusCommand::GetCurrentQueue => {
225                self.queue_handler.handle_get_current_queue().await
226            }
227            ServiceBusCommand::GetQueueStatistics {
228                queue_name,
229                queue_type,
230            } => {
231                self.queue_handler
232                    .handle_get_queue_statistics(queue_name, queue_type)
233                    .await
234            }
235
236            // Message retrieval commands
237            ServiceBusCommand::PeekMessages {
238                max_count,
239                from_sequence,
240            } => {
241                self.message_handler
242                    .handle_peek_messages(max_count, from_sequence)
243                    .await
244            }
245            ServiceBusCommand::ReceiveMessages { max_count } => {
246                self.message_handler
247                    .handle_receive_messages(max_count)
248                    .await
249            }
250            ServiceBusCommand::CompleteMessage { message_id } => {
251                self.message_handler
252                    .handle_complete_message(message_id)
253                    .await
254            }
255            ServiceBusCommand::AbandonMessage { message_id } => {
256                self.message_handler
257                    .handle_abandon_message(message_id)
258                    .await
259            }
260            ServiceBusCommand::DeadLetterMessage {
261                message_id,
262                reason,
263                error_description,
264            } => {
265                self.message_handler
266                    .handle_dead_letter_message(message_id, reason, error_description)
267                    .await
268            }
269
270            // Bulk operation commands
271            ServiceBusCommand::BulkComplete { message_ids } => {
272                self.bulk_handler.handle_bulk_complete(message_ids).await
273            }
274            ServiceBusCommand::BulkDelete {
275                message_ids,
276                max_position,
277            } => {
278                self.bulk_handler
279                    .handle_bulk_delete(message_ids, max_position)
280                    .await
281            }
282            ServiceBusCommand::BulkAbandon { message_ids } => {
283                self.bulk_handler.handle_bulk_abandon(message_ids).await
284            }
285            ServiceBusCommand::BulkDeadLetter {
286                message_ids,
287                reason,
288                error_description,
289            } => {
290                self.bulk_handler
291                    .handle_bulk_dead_letter(message_ids, reason, error_description)
292                    .await
293            }
294            ServiceBusCommand::BulkSend {
295                message_ids,
296                target_queue,
297                should_delete_source,
298                repeat_count,
299                max_position,
300            } => {
301                self.bulk_handler
302                    .handle_bulk_send(
303                        message_ids,
304                        target_queue,
305                        should_delete_source,
306                        repeat_count,
307                        max_position,
308                    )
309                    .await
310            }
311            ServiceBusCommand::BulkSendPeeked {
312                messages_data,
313                target_queue,
314                repeat_count,
315            } => {
316                self.bulk_handler
317                    .handle_bulk_send_peeked(messages_data, target_queue, repeat_count)
318                    .await
319            }
320
321            // Send operation commands
322            ServiceBusCommand::SendMessage {
323                queue_name,
324                message,
325            } => {
326                self.send_handler
327                    .handle_send_message(queue_name, message)
328                    .await
329            }
330            ServiceBusCommand::SendMessages {
331                queue_name,
332                messages,
333            } => {
334                self.send_handler
335                    .handle_send_messages(queue_name, messages)
336                    .await
337            }
338
339            // Status and health commands
340            ServiceBusCommand::GetConnectionStatus => {
341                self.status_handler.handle_get_connection_status().await
342            }
343            ServiceBusCommand::GetQueueStats { queue_name } => {
344                self.status_handler.handle_get_queue_stats(queue_name).await
345            }
346
347            // Resource management commands
348            ServiceBusCommand::DisposeConsumer => {
349                self.resource_handler.handle_dispose_consumer().await
350            }
351            ServiceBusCommand::DisposeAllResources => {
352                self.resource_handler.handle_dispose_all_resources().await
353            }
354            ServiceBusCommand::ResetConnection => self.handle_reset_connection().await,
355        }
356    }
357
358    // Static methods for Azure AD operations (keep existing functionality)
359    pub async fn get_azure_ad_token(
360        config: &AzureAdConfig,
361    ) -> Result<String, Box<dyn std::error::Error>> {
362        // Create a default HTTP client for backward compatibility
363        let http_client = reqwest::Client::new();
364        config.get_azure_ad_token(&http_client).await
365    }
366
367    pub async fn list_queues_azure_ad(
368        config: &AzureAdConfig,
369    ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
370        // Create a default HTTP client for backward compatibility
371        let http_client = reqwest::Client::new();
372        config.list_queues_azure_ad(&http_client).await
373    }
374
375    pub async fn list_namespaces_azure_ad(
376        config: &AzureAdConfig,
377    ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
378        // Create a default HTTP client for backward compatibility
379        let http_client = reqwest::Client::new();
380        config.list_namespaces_azure_ad(&http_client).await
381    }
382
383    // Helper methods with clean interfaces
384    pub async fn get_current_queue(&self) -> Option<QueueInfo> {
385        let consumer = self.consumer_manager.lock().await;
386        consumer.current_queue().cloned()
387    }
388
389    pub async fn is_connected(&self) -> bool {
390        let consumer = self.consumer_manager.lock().await;
391        let producer = self.producer_manager.lock().await;
392        consumer.is_consumer_ready() || producer.producer_count() > 0
393    }
394
395    pub async fn get_producer_count(&self) -> usize {
396        let producer = self.producer_manager.lock().await;
397        producer.producer_count()
398    }
399
400    pub async fn get_last_error(&self) -> Option<String> {
401        let last_error = self.last_error.lock().await;
402        last_error.clone()
403    }
404
405    /// Reset the entire AMQP connection by creating a new ServiceBusClient
406    pub async fn handle_reset_connection(&self) -> ServiceBusResult<ServiceBusResponse> {
407        log::info!("Resetting ServiceBus connection completely");
408
409        // First dispose all existing resources
410        let _ = self.resource_handler.handle_dispose_all_resources().await;
411
412        // Create a new ServiceBusClient from the stored connection string
413        let new_client = ServiceBusClient::new_from_connection_string(
414            &self.connection_string,
415            ServiceBusClientOptions::default(),
416        )
417        .await
418        .map_err(|e| {
419            ServiceBusError::ConnectionFailed(format!(
420                "Failed to create new ServiceBus client: {e}"
421            ))
422        })?;
423
424        // Replace the client in the Arc<Mutex>
425        {
426            let mut client_guard = self.service_bus_client.lock().await;
427            *client_guard = new_client;
428        }
429
430        // Update the consumer and producer managers with the new client
431        {
432            let mut consumer_manager = self.consumer_manager.lock().await;
433            consumer_manager
434                .reset_client(self.service_bus_client.clone())
435                .await?;
436        }
437
438        {
439            let mut producer_manager = self.producer_manager.lock().await;
440            producer_manager
441                .reset_client(self.service_bus_client.clone())
442                .await?;
443        }
444
445        log::info!("ServiceBus connection reset successfully");
446        Ok(ServiceBusResponse::ConnectionReset)
447    }
448}