server/service_bus_manager/
queue_statistics_service.rs1use super::ServiceBusError;
2use super::azure_management_client::{AzureManagementClient, StatisticsConfig};
3use super::types::QueueType;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7pub struct QueueStatisticsService {
9 management_client: Arc<Mutex<Option<AzureManagementClient>>>,
10 config: StatisticsConfig,
11 azure_ad_config: super::AzureAdConfig,
12 initialized: Arc<Mutex<bool>>,
13 http_client: reqwest::Client,
14}
15
16impl QueueStatisticsService {
17 pub fn new(
19 http_client: reqwest::Client,
20 config: StatisticsConfig,
21 azure_ad_config: super::AzureAdConfig,
22 ) -> Self {
23 Self {
24 management_client: Arc::new(Mutex::new(None)),
25 config,
26 azure_ad_config,
27 initialized: Arc::new(Mutex::new(false)),
28 http_client,
29 }
30 }
31
32 async fn ensure_initialized(&self) {
34 let mut initialized = self.initialized.lock().await;
35 if *initialized {
36 return;
37 }
38
39 if self.config.use_management_api && self.azure_ad_config.auth_method != "connection_string"
40 {
41 match AzureManagementClient::from_config(
42 self.http_client.clone(),
43 self.azure_ad_config.clone(),
44 ) {
45 Ok(client) => {
46 log::info!("Azure Management API client initialized successfully");
47 let mut client_lock = self.management_client.lock().await;
48 *client_lock = Some(client);
49 }
50 Err(e) => {
51 log::warn!(
52 "Failed to initialize Azure Management API client: {e}. Queue statistics will not be available.",
53 );
54 }
55 }
56 }
57
58 *initialized = true;
59 }
60
61 pub async fn get_queue_statistics(
63 &self,
64 queue_name: &str,
65 queue_type: &QueueType,
66 ) -> Option<u64> {
67 if !self.config.display_enabled {
68 log::debug!("Queue statistics display is disabled");
69 return None;
70 }
71
72 self.ensure_initialized().await;
74
75 let client_lock = self.management_client.lock().await;
76 let client = match &*client_lock {
77 Some(client) => client,
78 None => {
79 log::debug!("Management API client not available");
80 return None;
81 }
82 };
83
84 log::info!("Getting statistics for queue: {queue_name} (type: {queue_type:?})");
86
87 match client.get_queue_counts(queue_name).await {
88 Ok((active, dlq)) => {
89 let count = match queue_type {
90 QueueType::Main => active,
91 QueueType::DeadLetter => dlq,
92 };
93 log::debug!(
94 "Retrieved counts - active: {active}, dlq: {dlq}. Returning {count} for {queue_type:?} queue"
95 );
96 Some(count)
97 }
98 Err(ServiceBusError::InternalError(msg)) if msg.contains("Queue not found") => {
99 log::warn!("Queue not found: {queue_name}");
100 None
101 }
102 Err(ServiceBusError::AuthenticationError(msg)) => {
103 log::warn!("Authentication failed for management API: {msg}");
104 None
105 }
106 Err(e) => {
107 log::warn!("Failed to get queue statistics: {e}");
108 None
109 }
110 }
111 }
112
113 pub async fn is_available(&self) -> bool {
115 if !self.config.display_enabled {
116 return false;
117 }
118
119 self.ensure_initialized().await;
121 let client_lock = self.management_client.lock().await;
122 client_lock.is_some()
123 }
124
125 pub async fn get_both_queue_counts(&self, queue_name: &str) -> (Option<u64>, Option<u64>) {
127 if !self.config.display_enabled {
128 log::debug!("Queue statistics display is disabled");
129 return (None, None);
130 }
131
132 self.ensure_initialized().await;
134
135 let client_lock = self.management_client.lock().await;
136 let client = match &*client_lock {
137 Some(client) => client,
138 None => {
139 log::debug!("Management API client not available");
140 return (None, None);
141 }
142 };
143
144 log::info!("Getting both counts for queue: {queue_name}");
146
147 match client.get_queue_counts(queue_name).await {
148 Ok((active, dlq)) => {
149 log::debug!("Retrieved counts - active: {active}, dlq: {dlq}");
150 (Some(active), Some(dlq))
151 }
152 Err(ServiceBusError::InternalError(msg)) if msg.contains("Queue not found") => {
153 log::warn!("Queue not found: {queue_name}");
154 (None, None)
155 }
156 Err(ServiceBusError::AuthenticationError(msg)) => {
157 log::warn!("Authentication failed for management API: {msg}");
158 (None, None)
159 }
160 Err(e) => {
161 log::warn!("Failed to get queue statistics: {e}");
162 (None, None)
163 }
164 }
165 }
166
167 pub fn config(&self) -> &StatisticsConfig {
169 &self.config
170 }
171}