server/
service_bus_manager.rs

1//! # Service Bus Manager Module
2//!
3//! This module provides comprehensive Azure Service Bus management capabilities,
4//! including queue operations, message handling, authentication, and Azure resource discovery.
5//!
6//! ## Core Components
7//!
8//! - [`ServiceBusManager`] - Main interface for Service Bus operations
9//! - [`AzureManagementClient`] - Azure Resource Manager integration
10//! - [`ServiceBusCommand`] / [`ServiceBusResponse`] - Command/response pattern for operations
11//! - [`ServiceBusError`] - Comprehensive error handling
12//!
13//! ## Features
14//!
15//! - **Queue Management** - Create, list, and manage Service Bus queues
16//! - **Message Operations** - Send, receive, and bulk process messages
17//! - **Authentication** - Multiple auth methods (Device Code, Client Credentials, Connection String)
18//! - **Resource Discovery** - Discover Azure subscriptions, resource groups, and namespaces
19//! - **Statistics** - Queue statistics and monitoring
20//! - **Bulk Operations** - Efficient bulk message processing
21//!
22//! ## Usage
23//!
24//! ```no_run
25//! use quetty_server::service_bus_manager::{ServiceBusManager, AzureAdConfig};
26//!
27//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//!     let config = AzureAdConfig::default();
29//!     let manager = ServiceBusManager::new(config).await?;
30//!
31//!     // List available queues
32//!     let queues = manager.list_queues().await?;
33//!
34//!     // Connect to a specific queue
35//!     manager.connect_to_queue("my-queue").await?;
36//!
37//!     Ok(())
38//! }
39//! ```
40
41pub use self::azure_management_client::{
42    AccessKeys, AzureManagementClient, NamespaceProperties, ResourceGroup, ServiceBusNamespace,
43    Subscription,
44};
45pub use self::commands::ServiceBusCommand;
46pub use self::errors::{ServiceBusError, ServiceBusResult};
47pub use self::manager::ServiceBusManager;
48pub use self::responses::ServiceBusResponse;
49pub use self::types::*;
50
51/// Azure Management Client for resource discovery and management
52pub mod azure_management_client;
53/// Command handlers for processing Service Bus operations
54pub mod command_handlers;
55/// Command definitions for Service Bus operations
56pub mod commands;
57/// Consumer management for message reception
58pub mod consumer_manager;
59/// Error types and handling for Service Bus operations
60pub mod errors;
61/// Main Service Bus Manager implementation
62pub mod manager;
63/// Producer management for message sending
64pub mod producer_manager;
65/// Queue statistics and monitoring services
66pub mod queue_statistics_service;
67/// Response types for Service Bus operations
68pub mod responses;
69/// Core types and data structures
70pub mod types;
71
72use crate::utils::env::EnvUtils;
73
74/// Configuration for Azure Active Directory authentication and resource access.
75///
76/// This struct contains all the necessary information for authenticating with Azure AD
77/// and accessing Azure Service Bus resources. It supports multiple authentication methods
78/// and can load configuration from both direct values and environment variables.
79///
80/// # Authentication Methods
81///
82/// - `device_code` - Interactive device code flow (default for CLI usage)
83/// - `client_secret` - Service principal authentication
84/// - `connection_string` - Direct connection string authentication
85///
86/// # Examples
87///
88/// ```no_run
89/// use quetty_server::service_bus_manager::AzureAdConfig;
90///
91/// let config = AzureAdConfig {
92///     auth_method: "device_code".to_string(),
93///     tenant_id: Some("tenant-id".to_string()),
94///     client_id: Some("client-id".to_string()),
95///     subscription_id: Some("subscription-id".to_string()),
96///     resource_group: Some("resource-group".to_string()),
97///     namespace: Some("servicebus-namespace".to_string()),
98///     ..Default::default()
99/// };
100/// ```
101#[derive(Clone, Debug, serde::Deserialize, Default)]
102pub struct AzureAdConfig {
103    /// Authentication method: "device_code", "client_secret", or "connection_string"
104    #[serde(default = "default_auth_method")]
105    pub auth_method: String,
106    /// Azure AD tenant ID
107    pub tenant_id: Option<String>,
108    /// Azure AD application (client) ID
109    pub client_id: Option<String>,
110    /// Azure AD application client secret (required for client_secret)
111    pub client_secret: Option<String>,
112    /// Azure subscription ID for resource discovery
113    pub subscription_id: Option<String>,
114    /// Resource group name containing the Service Bus namespace
115    pub resource_group: Option<String>,
116    /// Service Bus namespace name
117    pub namespace: Option<String>,
118}
119
120fn default_auth_method() -> String {
121    "connection_string".to_string()
122}
123
124impl AzureAdConfig {
125    /// Gets the Azure AD tenant ID, returning an error if not configured.
126    ///
127    /// # Returns
128    ///
129    /// The tenant ID as a string reference
130    ///
131    /// # Errors
132    ///
133    /// Returns [`ServiceBusError::ConfigurationError`] if the tenant ID is not set
134    pub fn tenant_id(&self) -> Result<&str, ServiceBusError> {
135        self.tenant_id.as_deref()
136            .ok_or_else(|| ServiceBusError::ConfigurationError(
137                "AZURE_AD__TENANT_ID is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
138            ))
139    }
140
141    /// Gets the Azure AD client ID, returning an error if not configured.
142    ///
143    /// # Returns
144    ///
145    /// The client ID as a string reference
146    ///
147    /// # Errors
148    ///
149    /// Returns [`ServiceBusError::ConfigurationError`] if the client ID is not set
150    pub fn client_id(&self) -> Result<&str, ServiceBusError> {
151        self.client_id.as_deref()
152            .ok_or_else(|| ServiceBusError::ConfigurationError(
153                "AZURE_AD__CLIENT_ID is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
154            ))
155    }
156
157    /// Gets the Azure AD client secret, returning an error if not configured.
158    ///
159    /// Required for client credentials authentication flow.
160    ///
161    /// # Returns
162    ///
163    /// The client secret as a string reference
164    ///
165    /// # Errors
166    ///
167    /// Returns [`ServiceBusError::ConfigurationError`] if the client secret is not set
168    pub fn client_secret(&self) -> Result<String, ServiceBusError> {
169        if let Some(ref secret) = self.client_secret {
170            Ok(secret.clone())
171        } else {
172            EnvUtils::get_validated_var("AZURE_AD__CLIENT_SECRET")
173                .map_err(|_| ServiceBusError::ConfigurationError(
174                    "AZURE_AD__CLIENT_SECRET is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
175                ))
176        }
177    }
178
179    /// Gets the Azure subscription ID from config or environment variables.
180    ///
181    /// Checks the config first, then falls back to the `AZURE_AD__SUBSCRIPTION_ID` environment variable.
182    ///
183    /// # Returns
184    ///
185    /// The subscription ID as a string
186    ///
187    /// # Errors
188    ///
189    /// Returns [`ServiceBusError::ConfigurationError`] if the subscription ID is not found
190    pub fn subscription_id(&self) -> Result<String, ServiceBusError> {
191        if let Some(ref id) = self.subscription_id {
192            Ok(id.clone())
193        } else {
194            EnvUtils::get_validated_var("AZURE_AD__SUBSCRIPTION_ID")
195                .map_err(|_| ServiceBusError::ConfigurationError(
196                    "AZURE_AD__SUBSCRIPTION_ID is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
197                ))
198        }
199    }
200
201    /// Gets the Azure resource group name from config or environment variables.
202    ///
203    /// Checks the config first, then falls back to the `AZURE_AD__RESOURCE_GROUP` environment variable.
204    ///
205    /// # Returns
206    ///
207    /// The resource group name as a string
208    ///
209    /// # Errors
210    ///
211    /// Returns [`ServiceBusError::ConfigurationError`] if the resource group is not found
212    pub fn resource_group(&self) -> Result<String, ServiceBusError> {
213        if let Some(ref group) = self.resource_group {
214            Ok(group.clone())
215        } else {
216            EnvUtils::get_validated_var("AZURE_AD__RESOURCE_GROUP")
217                .map_err(|_| ServiceBusError::ConfigurationError(
218                    "AZURE_AD__RESOURCE_GROUP is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
219                ))
220        }
221    }
222
223    /// Gets the Service Bus namespace name from config or environment variables.
224    ///
225    /// Checks the config first, then falls back to the `AZURE_AD__NAMESPACE` environment variable.
226    ///
227    /// # Returns
228    ///
229    /// The namespace name as a string
230    ///
231    /// # Errors
232    ///
233    /// Returns [`ServiceBusError::ConfigurationError`] if the namespace is not found
234    pub fn namespace(&self) -> Result<String, ServiceBusError> {
235        if let Some(ref ns) = self.namespace {
236            Ok(ns.clone())
237        } else {
238            EnvUtils::get_validated_var("AZURE_AD__NAMESPACE")
239                .map_err(|_| ServiceBusError::ConfigurationError(
240                    "AZURE_AD__NAMESPACE is required but not found in configuration or environment variables. Please set this value in .env file or environment.".to_string()
241                ))
242        }
243    }
244
245    /// Checks if tenant ID is configured (in config only, not environment).
246    ///
247    /// # Returns
248    ///
249    /// `true` if tenant ID is set in the configuration
250    pub fn has_tenant_id(&self) -> bool {
251        self.tenant_id.is_some()
252    }
253
254    /// Checks if client ID is configured (in config only, not environment).
255    ///
256    /// # Returns
257    ///
258    /// `true` if client ID is set in the configuration
259    pub fn has_client_id(&self) -> bool {
260        self.client_id.is_some()
261    }
262
263    /// Checks if client secret is configured (in config only, not environment).
264    ///
265    /// # Returns
266    ///
267    /// `true` if client secret is set in the configuration
268    pub fn has_client_secret(&self) -> bool {
269        self.client_secret.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__CLIENT_SECRET")
270    }
271
272    /// Checks if subscription ID is available (config or environment).
273    ///
274    /// # Returns
275    ///
276    /// `true` if subscription ID is available from config or environment variables
277    pub fn has_subscription_id(&self) -> bool {
278        self.subscription_id.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__SUBSCRIPTION_ID")
279    }
280
281    /// Checks if resource group is available (config or environment).
282    ///
283    /// # Returns
284    ///
285    /// `true` if resource group is available from config or environment variables
286    pub fn has_resource_group(&self) -> bool {
287        self.resource_group.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__RESOURCE_GROUP")
288    }
289
290    /// Checks if namespace is available (config or environment).
291    ///
292    /// # Returns
293    ///
294    /// `true` if namespace is available from config or environment variables
295    pub fn has_namespace(&self) -> bool {
296        self.namespace.is_some() || EnvUtils::has_non_empty_var("AZURE_AD__NAMESPACE")
297    }
298
299    /// Obtains an Azure AD access token using the configured authentication method.
300    ///
301    /// This method tries different authentication approaches based on the configured auth method:
302    /// 1. For device code flow, attempts UI-integrated auth first
303    /// 2. Falls back to regular auth provider for other methods
304    /// 3. Returns an error for connection string auth (no Azure AD token available)
305    ///
306    /// # Arguments
307    ///
308    /// * `http_client` - HTTP client for making authentication requests
309    ///
310    /// # Returns
311    ///
312    /// An Azure AD access token string
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if:
317    /// - Authentication fails
318    /// - Connection string auth is used (Azure AD tokens not available)
319    /// - Required configuration is missing
320    pub async fn get_azure_ad_token(
321        &self,
322        http_client: &reqwest::Client,
323    ) -> Result<String, Box<dyn std::error::Error>> {
324        use crate::auth::{
325            create_auth_provider, create_service_bus_auth_provider, get_azure_ad_token_with_auth,
326        };
327
328        // If device code flow is configured, try to use UI-integrated auth first
329        if self.auth_method == "device_code" {
330            if let Ok(ui_provider) = create_auth_provider(None) {
331                if let Ok(token) = get_azure_ad_token_with_auth(&ui_provider).await {
332                    return Ok(token);
333                }
334            }
335        }
336
337        // For connection string authentication, we cannot get Azure AD tokens
338        if self.auth_method == "connection_string" {
339            return Err("Azure AD token not available for connection string authentication".into());
340        }
341
342        // Fallback to regular auth provider with the actual auth method
343        let auth_provider =
344            create_service_bus_auth_provider(&self.auth_method, None, self, http_client.clone())?;
345
346        let token = get_azure_ad_token_with_auth(&auth_provider).await?;
347        Ok(token)
348    }
349
350    /// Lists all Service Bus queues in the configured namespace using Azure AD authentication.
351    ///
352    /// # Arguments
353    ///
354    /// * `http_client` - HTTP client for making API requests
355    ///
356    /// # Returns
357    ///
358    /// A vector of queue names
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if:
363    /// - Authentication fails
364    /// - Azure Management API request fails
365    /// - Required configuration (subscription, resource group, namespace) is missing
366    pub async fn list_queues_azure_ad(
367        &self,
368        http_client: &reqwest::Client,
369    ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
370        let token = self.get_azure_ad_token(http_client).await?;
371        let url = format!(
372            "https://management.azure.com/subscriptions/{}/resourceGroups/{}/providers/Microsoft.ServiceBus/namespaces/{}/queues?api-version=2017-04-01",
373            self.subscription_id()?,
374            self.resource_group()?,
375            self.namespace()?
376        );
377
378        let resp = http_client.get(url).bearer_auth(token).send().await?;
379        let json: serde_json::Value = resp.json().await?;
380        let mut queues = Vec::new();
381        if let Some(arr) = json["value"].as_array() {
382            for queue in arr {
383                if let Some(name) = queue["name"].as_str() {
384                    queues.push(name.to_string());
385                }
386            }
387        }
388        Ok(queues)
389    }
390
391    /// Lists all Service Bus namespaces in the configured resource group using Azure AD authentication.
392    ///
393    /// # Arguments
394    ///
395    /// * `http_client` - HTTP client for making API requests
396    ///
397    /// # Returns
398    ///
399    /// A vector of namespace names
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if:
404    /// - Authentication fails
405    /// - Azure Management API request fails
406    /// - Required configuration (subscription, resource group) is missing
407    pub async fn list_namespaces_azure_ad(
408        &self,
409        http_client: &reqwest::Client,
410    ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
411        let token = self.get_azure_ad_token(http_client).await?;
412        let url = format!(
413            "https://management.azure.com/subscriptions/{}/resourceGroups/{}/providers/Microsoft.ServiceBus/namespaces?api-version=2017-04-01",
414            self.subscription_id()?,
415            self.resource_group()?
416        );
417
418        let resp = http_client.get(url).bearer_auth(token).send().await?;
419        let json: serde_json::Value = resp.json().await?;
420        let mut namespaces = Vec::new();
421        if let Some(arr) = json["value"].as_array() {
422            for ns in arr {
423                if let Some(name) = ns["name"].as_str() {
424                    namespaces.push(name.to_string());
425                }
426            }
427        }
428        Ok(namespaces)
429    }
430}