server/service_bus_manager.rs
1//! # Service Bus Manager Module
2//!
3//! This module provides comprehensive Azure Service Bus management capabilities,
4//! including queue operations, message handling, authentication, and Azure resource discovery.
5//!
6//! ## Core Components
7//!
8//! - [`ServiceBusManager`] - Main interface for Service Bus operations
9//! - [`AzureManagementClient`] - Azure Resource Manager integration
10//! - [`ServiceBusCommand`] / [`ServiceBusResponse`] - Command/response pattern for operations
11//! - [`ServiceBusError`] - Comprehensive error handling
12//!
13//! ## Features
14//!
15//! - **Queue Management** - Create, list, and manage Service Bus queues
16//! - **Message Operations** - Send, receive, and bulk process messages
17//! - **Authentication** - Multiple auth methods (Device Code, Client Credentials, Connection String)
18//! - **Resource Discovery** - Discover Azure subscriptions, resource groups, and namespaces
19//! - **Statistics** - Queue statistics and monitoring
20//! - **Bulk Operations** - Efficient bulk message processing
21//!
22//! ## Usage
23//!
24//! ```no_run
25//! use quetty_server::service_bus_manager::{ServiceBusManager, AzureAdConfig};
26//!
27//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//! let config = AzureAdConfig::default();
29//! let manager = ServiceBusManager::new(config).await?;
30//!
31//! // List available queues
32//! let queues = manager.list_queues().await?;
33//!
34//! // Connect to a specific queue
35//! manager.connect_to_queue("my-queue").await?;
36//!
37//! Ok(())
38//! }
39//! ```
40
41pub use self::azure_management_client::{
42 AccessKeys, AzureManagementClient, NamespaceProperties, ResourceGroup, ServiceBusNamespace,
43 Subscription,
44};
45pub use self::commands::ServiceBusCommand;
46pub use self::errors::{ServiceBusError, ServiceBusResult};
47pub use self::manager::ServiceBusManager;
48pub use self::responses::ServiceBusResponse;
49pub use self::types::*;
50
51/// Azure Management Client for resource discovery and management
52pub mod azure_management_client;
53/// Command handlers for processing Service Bus operations
54pub mod command_handlers;
55/// Command definitions for Service Bus operations
56pub mod commands;
57/// Consumer management for message reception
58pub mod consumer_manager;
59/// Error types and handling for Service Bus operations
60pub mod errors;
61/// Main Service Bus Manager implementation
62pub mod manager;
63/// Producer management for message sending
64pub mod producer_manager;
65/// Queue statistics and monitoring services
66pub mod queue_statistics_service;
67/// Response types for Service Bus operations
68pub mod responses;
69/// Core types and data structures
70pub mod types;
71
72use crate::utils::env::EnvUtils;
73
74/// Configuration for Azure Active Directory authentication and resource access.
75///
76/// This struct contains all the necessary information for authenticating with Azure AD
77/// and accessing Azure Service Bus resources. It supports multiple authentication methods
78/// and can load configuration from both direct values and environment variables.
79///
80/// # Authentication Methods
81///
82/// - `device_code` - Interactive device code flow (default for CLI usage)
83/// - `client_secret` - Service principal authentication
84/// - `connection_string` - Direct connection string authentication
85///
86/// # Examples
87///
88/// ```no_run
89/// use quetty_server::service_bus_manager::AzureAdConfig;
90///
91/// let config = AzureAdConfig {
92/// auth_method: "device_code".to_string(),
93/// tenant_id: Some("tenant-id".to_string()),
94/// client_id: Some("client-id".to_string()),
95/// subscription_id: Some("subscription-id".to_string()),
96/// resource_group: Some("resource-group".to_string()),
97/// namespace: Some("servicebus-namespace".to_string()),
98/// ..Default::default()
99/// };
100/// ```
101#[derive(Clone, Debug, serde::Deserialize, Default)]
102pub struct AzureAdConfig {
103 /// Authentication method: "device_code", "client_secret", or "connection_string"
104 #[serde(default = "default_auth_method")]
105 pub auth_method: String,
106 /// Azure AD tenant ID
107 pub tenant_id: Option<String>,
108 /// Azure AD application (client) ID
109 pub client_id: Option<String>,
110 /// Azure AD application client secret (required for client_secret)
111 pub client_secret: Option<String>,
112 /// Azure subscription ID for resource discovery
113 pub subscription_id: Option<String>,
114 /// Resource group name containing the Service Bus namespace
115 pub resource_group: Option<String>,
116 /// Service Bus namespace name
117 pub namespace: Option<String>,
118}
119
120fn default_auth_method() -> String {
121 "connection_string".to_string()
122}
123
124impl AzureAdConfig {
125 /// Gets the Azure AD tenant ID, returning an error if not configured.
126 ///
127 /// # Returns
128 ///
129 /// The tenant ID as a string reference
130 ///
131 /// # Errors
132 ///
133 /// Returns [`ServiceBusError::ConfigurationError`] if the tenant ID is not set
134 pub fn tenant_id(&self) -> Result<&str, ServiceBusError> {
135 self.tenant_id.as_deref()
136 .ok_or_else(|| ServiceBusError::ConfigurationError(
137 "AZURE_AD__TENANT_ID is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
138 ))
139 }
140
141 /// Gets the Azure AD client ID, returning an error if not configured.
142 ///
143 /// # Returns
144 ///
145 /// The client ID as a string reference
146 ///
147 /// # Errors
148 ///
149 /// Returns [`ServiceBusError::ConfigurationError`] if the client ID is not set
150 pub fn client_id(&self) -> Result<&str, ServiceBusError> {
151 self.client_id.as_deref()
152 .ok_or_else(|| ServiceBusError::ConfigurationError(
153 "AZURE_AD__CLIENT_ID is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
154 ))
155 }
156
157 /// Gets the Azure AD client secret, returning an error if not configured.
158 ///
159 /// Required for client credentials authentication flow.
160 ///
161 /// # Returns
162 ///
163 /// The client secret as a string reference
164 ///
165 /// # Errors
166 ///
167 /// Returns [`ServiceBusError::ConfigurationError`] if the client secret is not set
168 pub fn client_secret(&self) -> Result<String, ServiceBusError> {
169 if let Some(ref secret) = self.client_secret {
170 Ok(secret.clone())
171 } else {
172 EnvUtils::get_validated_var("AZURE_AD__CLIENT_SECRET")
173 .map_err(|_| ServiceBusError::ConfigurationError(
174 "AZURE_AD__CLIENT_SECRET is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
175 ))
176 }
177 }
178
179 /// Gets the Azure subscription ID from config or environment variables.
180 ///
181 /// Checks the config first, then falls back to the `AZURE_AD__SUBSCRIPTION_ID` environment variable.
182 ///
183 /// # Returns
184 ///
185 /// The subscription ID as a string
186 ///
187 /// # Errors
188 ///
189 /// Returns [`ServiceBusError::ConfigurationError`] if the subscription ID is not found
190 pub fn subscription_id(&self) -> Result<String, ServiceBusError> {
191 if let Some(ref id) = self.subscription_id {
192 Ok(id.clone())
193 } else {
194 EnvUtils::get_validated_var("AZURE_AD__SUBSCRIPTION_ID")
195 .map_err(|_| ServiceBusError::ConfigurationError(
196 "AZURE_AD__SUBSCRIPTION_ID is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
197 ))
198 }
199 }
200
201 /// Gets the Azure resource group name from config or environment variables.
202 ///
203 /// Checks the config first, then falls back to the `AZURE_AD__RESOURCE_GROUP` environment variable.
204 ///
205 /// # Returns
206 ///
207 /// The resource group name as a string
208 ///
209 /// # Errors
210 ///
211 /// Returns [`ServiceBusError::ConfigurationError`] if the resource group is not found
212 pub fn resource_group(&self) -> Result<String, ServiceBusError> {
213 if let Some(ref group) = self.resource_group {
214 Ok(group.clone())
215 } else {
216 EnvUtils::get_validated_var("AZURE_AD__RESOURCE_GROUP")
217 .map_err(|_| ServiceBusError::ConfigurationError(
218 "AZURE_AD__RESOURCE_GROUP is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
219 ))
220 }
221 }
222
223 /// Gets the Service Bus namespace name from config or environment variables.
224 ///
225 /// Checks the config first, then falls back to the `AZURE_AD__NAMESPACE` environment variable.
226 ///
227 /// # Returns
228 ///
229 /// The namespace name as a string
230 ///
231 /// # Errors
232 ///
233 /// Returns [`ServiceBusError::ConfigurationError`] if the namespace is not found
234 pub fn namespace(&self) -> Result<String, ServiceBusError> {
235 if let Some(ref ns) = self.namespace {
236 Ok(ns.clone())
237 } else {
238 EnvUtils::get_validated_var("AZURE_AD__NAMESPACE")
239 .map_err(|_| ServiceBusError::ConfigurationError(
240 "AZURE_AD__NAMESPACE is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
241 ))
242 }
243 }
244
245 /// Checks if tenant ID is configured (in config only, not environment).
246 ///
247 /// # Returns
248 ///
249 /// `true` if tenant ID is set in the configuration
250 pub fn has_tenant_id(&self) -> bool {
251 self.tenant_id.is_some()
252 }
253
254 /// Checks if client ID is configured (in config only, not environment).
255 ///
256 /// # Returns
257 ///
258 /// `true` if client ID is set in the configuration
259 pub fn has_client_id(&self) -> bool {
260 self.client_id.is_some()
261 }
262
263 /// Checks if client secret is configured (in config only, not environment).
264 ///
265 /// # Returns
266 ///
267 /// `true` if client secret is set in the configuration
268 pub fn has_client_secret(&self) -> bool {
269 self.client_secret.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__CLIENT_SECRET")
270 }
271
272 /// Checks if subscription ID is available (config or environment).
273 ///
274 /// # Returns
275 ///
276 /// `true` if subscription ID is available from config or environment variables
277 pub fn has_subscription_id(&self) -> bool {
278 self.subscription_id.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__SUBSCRIPTION_ID")
279 }
280
281 /// Checks if resource group is available (config or environment).
282 ///
283 /// # Returns
284 ///
285 /// `true` if resource group is available from config or environment variables
286 pub fn has_resource_group(&self) -> bool {
287 self.resource_group.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__RESOURCE_GROUP")
288 }
289
290 /// Checks if namespace is available (config or environment).
291 ///
292 /// # Returns
293 ///
294 /// `true` if namespace is available from config or environment variables
295 pub fn has_namespace(&self) -> bool {
296 self.namespace.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__NAMESPACE")
297 }
298
299 /// Obtains an Azure AD access token using the configured authentication method.
300 ///
301 /// This method tries different authentication approaches based on the configured auth method:
302 /// 1. For device code flow, attempts UI-integrated auth first
303 /// 2. Falls back to regular auth provider for other methods
304 /// 3. Returns an error for connection string auth (no Azure AD token available)
305 ///
306 /// # Arguments
307 ///
308 /// * `http_client` - HTTP client for making authentication requests
309 ///
310 /// # Returns
311 ///
312 /// An Azure AD access token string
313 ///
314 /// # Errors
315 ///
316 /// Returns an error if:
317 /// - Authentication fails
318 /// - Connection string auth is used (Azure AD tokens not available)
319 /// - Required configuration is missing
320 pub async fn get_azure_ad_token(
321 &self,
322 http_client: &reqwest::Client,
323 ) -> Result<String, Box<dyn std::error::Error>> {
324 use crate::auth::{
325 create_auth_provider, create_service_bus_auth_provider, get_azure_ad_token_with_auth,
326 };
327
328 // If device code flow is configured, try to use UI-integrated auth first
329 if self.auth_method == "device_code" {
330 if let Ok(ui_provider) = create_auth_provider(None) {
331 if let Ok(token) = get_azure_ad_token_with_auth(&ui_provider).await {
332 return Ok(token);
333 }
334 }
335 }
336
337 // For connection string authentication, we cannot get Azure AD tokens
338 if self.auth_method == "connection_string" {
339 return Err("Azure AD token not available for connection string authentication".into());
340 }
341
342 // Fallback to regular auth provider with the actual auth method
343 let auth_provider =
344 create_service_bus_auth_provider(&self.auth_method, None, self, http_client.clone())?;
345
346 let token = get_azure_ad_token_with_auth(&auth_provider).await?;
347 Ok(token)
348 }
349
350 /// Lists all Service Bus queues in the configured namespace using Azure AD authentication.
351 ///
352 /// # Arguments
353 ///
354 /// * `http_client` - HTTP client for making API requests
355 ///
356 /// # Returns
357 ///
358 /// A vector of queue names
359 ///
360 /// # Errors
361 ///
362 /// Returns an error if:
363 /// - Authentication fails
364 /// - Azure Management API request fails
365 /// - Required configuration (subscription, resource group, namespace) is missing
366 pub async fn list_queues_azure_ad(
367 &self,
368 http_client: &reqwest::Client,
369 ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
370 let token = self.get_azure_ad_token(http_client).await?;
371 let url = format!(
372 "https://management.azure.com/subscriptions/{}/resourceGroups/{}/providers/Microsoft.ServiceBus/namespaces/{}/queues?api-version=2017-04-01",
373 self.subscription_id()?,
374 self.resource_group()?,
375 self.namespace()?
376 );
377
378 let resp = http_client.get(url).bearer_auth(token).send().await?;
379 let json: serde_json::Value = resp.json().await?;
380 let mut queues = Vec::new();
381 if let Some(arr) = json["value"].as_array() {
382 for queue in arr {
383 if let Some(name) = queue["name"].as_str() {
384 queues.push(name.to_string());
385 }
386 }
387 }
388 Ok(queues)
389 }
390
391 /// Lists all Service Bus namespaces in the configured resource group using Azure AD authentication.
392 ///
393 /// # Arguments
394 ///
395 /// * `http_client` - HTTP client for making API requests
396 ///
397 /// # Returns
398 ///
399 /// A vector of namespace names
400 ///
401 /// # Errors
402 ///
403 /// Returns an error if:
404 /// - Authentication fails
405 /// - Azure Management API request fails
406 /// - Required configuration (subscription, resource group) is missing
407 pub async fn list_namespaces_azure_ad(
408 &self,
409 http_client: &reqwest::Client,
410 ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
411 let token = self.get_azure_ad_token(http_client).await?;
412 let url = format!(
413 "https://management.azure.com/subscriptions/{}/resourceGroups/{}/providers/Microsoft.ServiceBus/namespaces?api-version=2017-04-01",
414 self.subscription_id()?,
415 self.resource_group()?
416 );
417
418 let resp = http_client.get(url).bearer_auth(token).send().await?;
419 let json: serde_json::Value = resp.json().await?;
420 let mut namespaces = Vec::new();
421 if let Some(arr) = json["value"].as_array() {
422 for ns in arr {
423 if let Some(name) = ns["name"].as_str() {
424 namespaces.push(name.to_string());
425 }
426 }
427 }
428 Ok(namespaces)
429 }
430}