server/service_bus_manager/
queue_statistics_service.rs

1use super::ServiceBusError;
2use super::azure_management_client::{AzureManagementClient, StatisticsConfig};
3use super::types::QueueType;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7/// Service for getting real queue statistics from Azure Management API
8pub struct QueueStatisticsService {
9    management_client: Arc<Mutex<Option<AzureManagementClient>>>,
10    config: StatisticsConfig,
11    azure_ad_config: super::AzureAdConfig,
12    initialized: Arc<Mutex<bool>>,
13    http_client: reqwest::Client,
14}
15
16impl QueueStatisticsService {
17    /// Create a new queue statistics service
18    pub fn new(
19        http_client: reqwest::Client,
20        config: StatisticsConfig,
21        azure_ad_config: super::AzureAdConfig,
22    ) -> Self {
23        Self {
24            management_client: Arc::new(Mutex::new(None)),
25            config,
26            azure_ad_config,
27            initialized: Arc::new(Mutex::new(false)),
28            http_client,
29        }
30    }
31
32    /// Initialize the management client lazily on first use
33    async fn ensure_initialized(&self) {
34        let mut initialized = self.initialized.lock().await;
35        if *initialized {
36            return;
37        }
38
39        if self.config.use_management_api && self.azure_ad_config.auth_method != "connection_string"
40        {
41            match AzureManagementClient::from_config(
42                self.http_client.clone(),
43                self.azure_ad_config.clone(),
44            ) {
45                Ok(client) => {
46                    log::info!("Azure Management API client initialized successfully");
47                    let mut client_lock = self.management_client.lock().await;
48                    *client_lock = Some(client);
49                }
50                Err(e) => {
51                    log::warn!(
52                        "Failed to initialize Azure Management API client: {e}. Queue statistics will not be available.",
53                    );
54                }
55            }
56        }
57
58        *initialized = true;
59    }
60
61    /// Get real queue statistics from Azure Management API
62    pub async fn get_queue_statistics(
63        &self,
64        queue_name: &str,
65        queue_type: &QueueType,
66    ) -> Option<u64> {
67        if !self.config.display_enabled {
68            log::debug!("Queue statistics display is disabled");
69            return None;
70        }
71
72        // Ensure the client is initialized
73        self.ensure_initialized().await;
74
75        let client_lock = self.management_client.lock().await;
76        let client = match &*client_lock {
77            Some(client) => client,
78            None => {
79                log::debug!("Management API client not available");
80                return None;
81            }
82        };
83
84        // Fetch counts from management API for the main queue name
85        log::info!("Getting statistics for queue: {queue_name} (type: {queue_type:?})");
86
87        match client.get_queue_counts(queue_name).await {
88            Ok((active, dlq)) => {
89                let count = match queue_type {
90                    QueueType::Main => active,
91                    QueueType::DeadLetter => dlq,
92                };
93                log::debug!(
94                    "Retrieved counts - active: {active}, dlq: {dlq}. Returning {count} for {queue_type:?} queue"
95                );
96                Some(count)
97            }
98            Err(ServiceBusError::InternalError(msg)) if msg.contains("Queue not found") => {
99                log::warn!("Queue not found: {queue_name}");
100                None
101            }
102            Err(ServiceBusError::AuthenticationError(msg)) => {
103                log::warn!("Authentication failed for management API: {msg}");
104                None
105            }
106            Err(e) => {
107                log::warn!("Failed to get queue statistics: {e}");
108                None
109            }
110        }
111    }
112
113    /// Check if the service is properly configured and ready
114    pub async fn is_available(&self) -> bool {
115        if !self.config.display_enabled {
116            return false;
117        }
118
119        // Check if we have a client after initialization
120        self.ensure_initialized().await;
121        let client_lock = self.management_client.lock().await;
122        client_lock.is_some()
123    }
124
125    /// Get both active and dead letter counts from Azure Management API
126    pub async fn get_both_queue_counts(&self, queue_name: &str) -> (Option<u64>, Option<u64>) {
127        if !self.config.display_enabled {
128            log::debug!("Queue statistics display is disabled");
129            return (None, None);
130        }
131
132        // Ensure the client is initialized
133        self.ensure_initialized().await;
134
135        let client_lock = self.management_client.lock().await;
136        let client = match &*client_lock {
137            Some(client) => client,
138            None => {
139                log::debug!("Management API client not available");
140                return (None, None);
141            }
142        };
143
144        // Fetch counts from management API for the main queue name
145        log::info!("Getting both counts for queue: {queue_name}");
146
147        match client.get_queue_counts(queue_name).await {
148            Ok((active, dlq)) => {
149                log::debug!("Retrieved counts - active: {active}, dlq: {dlq}");
150                (Some(active), Some(dlq))
151            }
152            Err(ServiceBusError::InternalError(msg)) if msg.contains("Queue not found") => {
153                log::warn!("Queue not found: {queue_name}");
154                (None, None)
155            }
156            Err(ServiceBusError::AuthenticationError(msg)) => {
157                log::warn!("Authentication failed for management API: {msg}");
158                (None, None)
159            }
160            Err(e) => {
161                log::warn!("Failed to get queue statistics: {e}");
162                (None, None)
163            }
164        }
165    }
166
167    /// Get the current configuration
168    pub fn config(&self) -> &StatisticsConfig {
169        &self.config
170    }
171}