Skip to main content

queue_runtime/providers/
azure.rs

1//! Azure Service Bus provider implementation.
2//!
3//! This module provides production-ready Azure Service Bus integration with:
4//! - Native session support for ordered message processing
5//! - Connection pooling and sender/receiver caching
6//! - Dead letter queue integration
7//! - Multiple authentication methods (connection string, managed identity, client secret)
8//! - Comprehensive error classification for retry logic
9//!
10//! ## Authentication Methods
11//!
12//! The provider supports four authentication methods:
13//! - **ConnectionString**: Direct connection string with embedded credentials
14//! - **ManagedIdentity**: Azure Managed Identity for serverless environments
15//! - **ClientSecret**: Service principal with tenant/client ID and secret
16//! - **DefaultCredential**: Default Azure credential chain for development
17//!
18//! ## Session Management
19//!
20//! Azure Service Bus provides native session support with:
21//! - Strict FIFO ordering within session boundaries
22//! - Exclusive session locks during processing
23//! - Automatic lock renewal for long-running operations
24//! - Session state storage for stateful processing
25//!
26//! ## Example
27//!
28//! ```no_run
29//! use queue_runtime::{QueueClientFactory, QueueConfig, ProviderConfig, AzureServiceBusConfig, AzureAuthMethod};
30//! use chrono::Duration;
31//!
32//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33//! let config = QueueConfig {
34//!     provider: ProviderConfig::AzureServiceBus(AzureServiceBusConfig {
35//!         connection_string: Some("Endpoint=sb://...".to_string()),
36//!         namespace: None,
37//!         auth_method: AzureAuthMethod::ConnectionString,
38//!         use_sessions: true,
39//!         session_timeout: Duration::minutes(5),
40//!     }),
41//!     ..Default::default()
42//! };
43//!
44//! let client = QueueClientFactory::create_client(config).await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use crate::client::{QueueProvider, SessionProvider};
50use crate::error::{ConfigurationError, QueueError, SerializationError};
51use crate::message::{
52    Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
53};
54use crate::provider::{AzureServiceBusConfig, ProviderType, SessionSupport};
55use async_trait::async_trait;
56use azure_core::auth::TokenCredential;
57use azure_identity::{
58    ClientSecretCredential, TokenCredentialOptions, VirtualMachineManagedIdentityCredential,
59};
60use chrono::{Duration, Utc};
61use reqwest::{header, Client as HttpClient, StatusCode};
62use serde::{Deserialize, Serialize};
63use std::collections::HashMap;
64use std::fmt;
65use std::str::FromStr;
66use std::sync::Arc;
67use tokio::sync::RwLock;
68
69#[cfg(test)]
70#[path = "azure_tests.rs"]
71mod tests;
72
73// ============================================================================
74// Authentication Types
75// ============================================================================
76
77/// Authentication method for Azure Service Bus
78#[derive(Clone, Serialize, Deserialize)]
79pub enum AzureAuthMethod {
80    /// Connection string with embedded credentials
81    ConnectionString,
82    /// Azure Managed Identity (for serverless environments)
83    ManagedIdentity,
84    /// Service principal with client secret
85    ClientSecret {
86        tenant_id: String,
87        client_id: String,
88        client_secret: String,
89    },
90    /// Default Azure credential chain (for development)
91    DefaultCredential,
92}
93
94impl fmt::Debug for AzureAuthMethod {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::ConnectionString => f.debug_struct("ConnectionString").finish(),
98            Self::ManagedIdentity => f.debug_struct("ManagedIdentity").finish(),
99            Self::ClientSecret {
100                tenant_id,
101                client_id,
102                ..
103            } => f
104                .debug_struct("ClientSecret")
105                .field("tenant_id", tenant_id)
106                .field("client_id", client_id)
107                .field("client_secret", &"<REDACTED>")
108                .finish(),
109            Self::DefaultCredential => f.debug_struct("DefaultCredential").finish(),
110        }
111    }
112}
113
114impl fmt::Display for AzureAuthMethod {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            Self::ConnectionString => write!(f, "ConnectionString"),
118            Self::ManagedIdentity => write!(f, "ManagedIdentity"),
119            Self::ClientSecret { .. } => write!(f, "ClientSecret"),
120            Self::DefaultCredential => write!(f, "DefaultCredential"),
121        }
122    }
123}
124
125// ============================================================================
126// Error Types
127// ============================================================================
128
129/// Azure Service Bus specific errors
130#[derive(Debug, thiserror::Error)]
131pub enum AzureError {
132    #[error("Authentication failed: {0}")]
133    AuthenticationError(String),
134
135    #[error("Network error: {0}")]
136    NetworkError(String),
137
138    #[error("Service Bus error: {0}")]
139    ServiceBusError(String),
140
141    #[error("Message lock lost: {0}")]
142    MessageLockLost(String),
143
144    #[error("Session lock lost: {0}")]
145    SessionLockLost(String),
146
147    #[error("Invalid configuration: {0}")]
148    ConfigurationError(String),
149
150    #[error("Serialization error: {0}")]
151    SerializationError(String),
152}
153
154impl AzureError {
155    /// Check if error is transient and should be retried
156    pub fn is_transient(&self) -> bool {
157        match self {
158            Self::AuthenticationError(_) => false,
159            Self::NetworkError(_) => true,
160            Self::ServiceBusError(_) => true, // Most Service Bus errors are transient
161            Self::MessageLockLost(_) => false,
162            Self::SessionLockLost(_) => false,
163            Self::ConfigurationError(_) => false,
164            Self::SerializationError(_) => false,
165        }
166    }
167
168    /// Map Azure error to QueueError
169    pub fn to_queue_error(self) -> QueueError {
170        match self {
171            Self::AuthenticationError(msg) => QueueError::AuthenticationFailed { message: msg },
172            Self::NetworkError(msg) => QueueError::ConnectionFailed { message: msg },
173            Self::ServiceBusError(msg) => QueueError::ProviderError {
174                provider: "AzureServiceBus".to_string(),
175                code: "ServiceBusError".to_string(),
176                message: msg,
177            },
178            Self::MessageLockLost(msg) => QueueError::MessageNotFound { receipt: msg },
179            Self::SessionLockLost(session_id) => QueueError::SessionNotFound { session_id },
180            Self::ConfigurationError(msg) => {
181                QueueError::ConfigurationError(ConfigurationError::Invalid { message: msg })
182            }
183            Self::SerializationError(msg) => QueueError::SerializationError(
184                SerializationError::JsonError(serde_json::Error::io(std::io::Error::new(
185                    std::io::ErrorKind::InvalidData,
186                    msg,
187                ))),
188            ),
189        }
190    }
191}
192
193// ============================================================================
194// Azure Service Bus Provider
195// ============================================================================
196
197/// Azure Service Bus queue provider implementation using REST API
198///
199/// This provider implements the QueueProvider trait using Azure Service Bus REST API.
200/// It supports:
201/// - Multiple authentication methods (connection string, managed identity, service principal)
202/// - HTTP-based message operations (send, receive, complete, abandon, dead-letter)
203/// - Session support for ordered processing
204/// - Lock token management for PeekLock receive mode
205/// - Comprehensive error classification with retry logic
206pub struct AzureServiceBusProvider {
207    config: AzureServiceBusConfig,
208    http_client: HttpClient,
209    namespace_url: String,
210    credential: Option<Arc<dyn TokenCredential + Send + Sync>>,
211    // Cached lock tokens: receipt_handle -> (lock_token, queue_name)
212    lock_tokens: Arc<RwLock<HashMap<String, (String, String)>>>,
213}
214
215impl fmt::Debug for AzureServiceBusProvider {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        f.debug_struct("AzureServiceBusProvider")
218            .field("config", &self.config)
219            .field("namespace_url", &self.namespace_url)
220            .field(
221                "credential",
222                &self.credential.as_ref().map(|_| "<TokenCredential>"),
223            )
224            .field("lock_tokens", &self.lock_tokens)
225            .finish()
226    }
227}
228
229impl AzureServiceBusProvider {
230    /// Create new Azure Service Bus provider
231    ///
232    /// # Arguments
233    ///
234    /// * `config` - Azure Service Bus configuration with authentication details
235    ///
236    /// # Errors
237    ///
238    /// Returns error if:
239    /// - Connection string is invalid
240    /// - Authentication fails
241    /// - Namespace is not accessible
242    ///
243    /// # Example
244    ///
245    /// ```no_run
246    /// use queue_runtime::{AzureServiceBusConfig, AzureAuthMethod};
247    /// use queue_runtime::providers::AzureServiceBusProvider;
248    /// use chrono::Duration;
249    ///
250    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
251    /// let config = AzureServiceBusConfig {
252    ///     connection_string: Some("Endpoint=sb://...".to_string()),
253    ///     namespace: None,
254    ///     auth_method: AzureAuthMethod::ConnectionString,
255    ///     use_sessions: true,
256    ///     session_timeout: Duration::minutes(5),
257    /// };
258    ///
259    /// let provider = AzureServiceBusProvider::new(config).await?;
260    /// # Ok(())
261    /// # }
262    /// ```
263    pub async fn new(config: AzureServiceBusConfig) -> Result<Self, AzureError> {
264        // Validate configuration
265        Self::validate_config(&config)?;
266
267        // Extract namespace URL and setup authentication
268        let (namespace_url, credential) = match &config.auth_method {
269            AzureAuthMethod::ConnectionString => {
270                let conn_str = config.connection_string.as_ref().ok_or_else(|| {
271                    AzureError::ConfigurationError(
272                        "Connection string required for ConnectionString auth".to_string(),
273                    )
274                })?;
275
276                let namespace_url = Self::parse_connection_string_endpoint(conn_str)?;
277                (namespace_url, None)
278            }
279            AzureAuthMethod::ManagedIdentity => {
280                let namespace = config.namespace.as_ref().ok_or_else(|| {
281                    AzureError::ConfigurationError(
282                        "Namespace required for ManagedIdentity auth".to_string(),
283                    )
284                })?;
285
286                let credential =
287                    VirtualMachineManagedIdentityCredential::new(TokenCredentialOptions::default());
288                let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
289                (
290                    namespace_url,
291                    Some(Arc::new(credential) as Arc<dyn TokenCredential + Send + Sync>),
292                )
293            }
294            AzureAuthMethod::ClientSecret {
295                ref tenant_id,
296                ref client_id,
297                ref client_secret,
298            } => {
299                let namespace = config.namespace.as_ref().ok_or_else(|| {
300                    AzureError::ConfigurationError(
301                        "Namespace required for ClientSecret auth".to_string(),
302                    )
303                })?;
304
305                // Create HTTP client for credential
306                let http_client = azure_core::new_http_client();
307                let authority_host = azure_core::Url::parse("https://login.microsoftonline.com")
308                    .map_err(|e| {
309                        AzureError::ConfigurationError(format!("Invalid authority URL: {}", e))
310                    })?;
311
312                let credential = ClientSecretCredential::new(
313                    http_client,
314                    authority_host,
315                    tenant_id.clone(),
316                    client_id.clone(),
317                    client_secret.clone(),
318                );
319                let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
320                (
321                    namespace_url,
322                    Some(Arc::new(credential) as Arc<dyn TokenCredential + Send + Sync>),
323                )
324            }
325            AzureAuthMethod::DefaultCredential => {
326                let namespace = config.namespace.as_ref().ok_or_else(|| {
327                    AzureError::ConfigurationError(
328                        "Namespace required for DefaultCredential auth".to_string(),
329                    )
330                })?;
331
332                // Use VirtualMachine ManagedIdentity as default
333                let credential =
334                    VirtualMachineManagedIdentityCredential::new(TokenCredentialOptions::default());
335                let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
336                (
337                    namespace_url,
338                    Some(Arc::new(credential) as Arc<dyn TokenCredential + Send + Sync>),
339                )
340            }
341        };
342
343        // Create HTTP client
344        let http_client = HttpClient::builder()
345            .timeout(std::time::Duration::from_secs(30))
346            .build()
347            .map_err(|e| {
348                AzureError::NetworkError(format!("Failed to create HTTP client: {}", e))
349            })?;
350
351        Ok(Self {
352            config,
353            http_client,
354            namespace_url,
355            credential,
356            lock_tokens: Arc::new(RwLock::new(HashMap::new())),
357        })
358    }
359
360    /// Parse endpoint from connection string
361    fn parse_connection_string_endpoint(conn_str: &str) -> Result<String, AzureError> {
362        for part in conn_str.split(';') {
363            if let Some(endpoint) = part.strip_prefix("Endpoint=") {
364                return Ok(endpoint.trim_end_matches('/').to_string());
365            }
366        }
367        Err(AzureError::ConfigurationError(
368            "Invalid connection string: missing Endpoint".to_string(),
369        ))
370    }
371
372    /// Validate Azure Service Bus configuration
373    fn validate_config(config: &AzureServiceBusConfig) -> Result<(), AzureError> {
374        match &config.auth_method {
375            AzureAuthMethod::ConnectionString => {
376                if config.connection_string.is_none() {
377                    return Err(AzureError::ConfigurationError(
378                        "Connection string required for ConnectionString auth method".to_string(),
379                    ));
380                }
381            }
382            AzureAuthMethod::ManagedIdentity | AzureAuthMethod::DefaultCredential => {
383                if config.namespace.is_none() {
384                    return Err(AzureError::ConfigurationError(
385                        "Namespace required for ManagedIdentity/DefaultCredential auth".to_string(),
386                    ));
387                }
388            }
389            AzureAuthMethod::ClientSecret {
390                tenant_id,
391                client_id,
392                client_secret,
393            } => {
394                if config.namespace.is_none() {
395                    return Err(AzureError::ConfigurationError(
396                        "Namespace required for ClientSecret auth".to_string(),
397                    ));
398                }
399                if tenant_id.is_empty() || client_id.is_empty() || client_secret.is_empty() {
400                    return Err(AzureError::ConfigurationError(
401                        "Tenant ID, Client ID, and Client Secret required for ClientSecret auth"
402                            .to_string(),
403                    ));
404                }
405            }
406        }
407
408        Ok(())
409    }
410
411    /// Get authentication token for Service Bus operations
412    async fn get_auth_token(&self) -> Result<String, AzureError> {
413        match &self.credential {
414            Some(cred) => {
415                let scopes = &["https://servicebus.azure.net/.default"];
416                let token = cred.get_token(scopes).await.map_err(|e| {
417                    AzureError::AuthenticationError(format!("Failed to get token: {}", e))
418                })?;
419                Ok(token.token.secret().to_string())
420            }
421            None => {
422                // Connection string auth - parse SharedAccessSignature
423                self.get_sas_token()
424            }
425        }
426    }
427
428    /// Extract SAS token from connection string
429    fn get_sas_token(&self) -> Result<String, AzureError> {
430        let conn_str = self.config.connection_string.as_ref().ok_or_else(|| {
431            AzureError::AuthenticationError("No connection string available".to_string())
432        })?;
433
434        // Parse connection string for SharedAccessKeyName and SharedAccessKey
435        let mut key_name = None;
436        let mut key = None;
437
438        for part in conn_str.split(';') {
439            if let Some(value) = part.strip_prefix("SharedAccessKeyName=") {
440                key_name = Some(value.to_string());
441            } else if let Some(value) = part.strip_prefix("SharedAccessKey=") {
442                key = Some(value.to_string());
443            }
444        }
445
446        let key_name = key_name.ok_or_else(|| {
447            AzureError::AuthenticationError(
448                "Missing SharedAccessKeyName in connection string".to_string(),
449            )
450        })?;
451        let key = key.ok_or_else(|| {
452            AzureError::AuthenticationError(
453                "Missing SharedAccessKey in connection string".to_string(),
454            )
455        })?;
456
457        // Generate SAS token
458        let expiry = (Utc::now() + Duration::hours(1)).timestamp();
459        let resource = self.namespace_url.to_string();
460        let string_to_sign = format!("{}\n{}", urlencoding::encode(&resource), expiry);
461
462        use hmac::{Hmac, Mac};
463        use sha2::Sha256;
464
465        type HmacSha256 = Hmac<Sha256>;
466
467        use base64::{engine::general_purpose::STANDARD, Engine};
468
469        let key_bytes = STANDARD.decode(&key).map_err(|e| {
470            AzureError::AuthenticationError(format!("Invalid SharedAccessKey: {}", e))
471        })?;
472
473        let mut mac = HmacSha256::new_from_slice(&key_bytes).map_err(|e| {
474            AzureError::AuthenticationError(format!("Failed to create HMAC: {}", e))
475        })?;
476        mac.update(string_to_sign.as_bytes());
477        let signature = STANDARD.encode(mac.finalize().into_bytes());
478
479        let sas = format!(
480            "SharedAccessSignature sr={}&sig={}&se={}&skn={}",
481            urlencoding::encode(&resource),
482            urlencoding::encode(&signature),
483            expiry,
484            urlencoding::encode(&key_name)
485        );
486
487        Ok(sas)
488    }
489}
490
491// ============================================================================
492// Azure Service Bus REST API Types
493// ============================================================================
494
495/// Message body for sending messages
496#[derive(Debug, Serialize, Deserialize)]
497struct ServiceBusMessageBody {
498    #[serde(rename = "ContentType")]
499    content_type: String,
500    #[serde(rename = "Body")]
501    body: String, // Base64-encoded
502    #[serde(rename = "BrokerProperties")]
503    broker_properties: BrokerProperties,
504}
505
506#[derive(Debug, Serialize, Deserialize)]
507struct BrokerProperties {
508    #[serde(rename = "MessageId")]
509    message_id: String,
510    #[serde(rename = "SessionId", skip_serializing_if = "Option::is_none")]
511    session_id: Option<String>,
512    #[serde(rename = "TimeToLive", skip_serializing_if = "Option::is_none")]
513    time_to_live: Option<u64>,
514}
515
516/// Batch receive response structure
517#[derive(Debug, Deserialize)]
518struct ServiceBusMessageResponse {
519    #[serde(rename = "Body")]
520    body: String,
521    #[serde(rename = "BrokerProperties")]
522    broker_properties: ReceivedBrokerProperties,
523}
524
525#[allow(dead_code)] // Used when receive operations are implemented
526#[derive(Debug, Deserialize)]
527struct ReceivedServiceBusMessage {
528    #[serde(rename = "Body")]
529    body: String,
530    #[serde(rename = "BrokerProperties")]
531    broker_properties: ReceivedBrokerProperties,
532}
533
534#[allow(dead_code)] // Used when receive operations are implemented
535#[derive(Debug, Deserialize)]
536struct ReceivedBrokerProperties {
537    #[serde(rename = "MessageId")]
538    message_id: String,
539    #[serde(rename = "SessionId")]
540    session_id: Option<String>,
541    #[serde(rename = "LockToken")]
542    lock_token: String,
543    #[serde(rename = "DeliveryCount")]
544    delivery_count: u32,
545    #[serde(rename = "EnqueuedTimeUtc")]
546    enqueued_time_utc: String,
547}
548
549// ============================================================================
550// QueueProvider Implementation
551// ============================================================================
552
553#[async_trait]
554impl QueueProvider for AzureServiceBusProvider {
555    async fn send_message(
556        &self,
557        queue: &QueueName,
558        message: &Message,
559    ) -> Result<MessageId, QueueError> {
560        // Generate message ID
561        let message_id = MessageId::new();
562
563        // Serialize message body (it's already Bytes, just base64 encode it)
564        use base64::{engine::general_purpose::STANDARD, Engine};
565        let body_base64 = STANDARD.encode(&message.body);
566
567        // Build broker properties
568        let broker_props = BrokerProperties {
569            message_id: message_id.to_string(),
570            session_id: message.session_id.as_ref().map(|s| s.to_string()),
571            time_to_live: message
572                .time_to_live
573                .as_ref()
574                .map(|ttl| ttl.num_seconds() as u64),
575        };
576
577        // Build URL: {namespace}/{queue}/messages
578        let url = format!("{}/{}/messages", self.namespace_url, queue.as_str());
579
580        // Get auth token
581        let auth_token = self
582            .get_auth_token()
583            .await
584            .map_err(|e| e.to_queue_error())?;
585
586        // Send HTTP POST request
587        let response = self
588            .http_client
589            .post(&url)
590            .header(header::AUTHORIZATION, auth_token)
591            .header(
592                header::CONTENT_TYPE,
593                "application/atom+xml;type=entry;charset=utf-8",
594            )
595            .header(
596                "BrokerProperties",
597                serde_json::to_string(&broker_props).unwrap(),
598            )
599            .body(body_base64)
600            .send()
601            .await
602            .map_err(|e| {
603                AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
604            })?;
605
606        // Check response status
607        match response.status() {
608            StatusCode::CREATED | StatusCode::OK => Ok(message_id),
609            status => {
610                let error_body = response.text().await.unwrap_or_default();
611                Err(QueueError::ProviderError {
612                    provider: "AzureServiceBus".to_string(),
613                    code: status.as_str().to_string(),
614                    message: format!("Send failed: {}", error_body),
615                })
616            }
617        }
618    }
619
620    async fn send_messages(
621        &self,
622        queue: &QueueName,
623        messages: &[Message],
624    ) -> Result<Vec<MessageId>, QueueError> {
625        // Azure Service Bus supports batch send (max 100 messages)
626        if messages.len() > 100 {
627            return Err(QueueError::BatchTooLarge {
628                size: messages.len(),
629                max_size: 100,
630            });
631        }
632
633        if messages.is_empty() {
634            return Ok(Vec::new());
635        }
636
637        // Build batch request body - array of messages
638        let mut batch_messages = Vec::with_capacity(messages.len());
639        let mut message_ids = Vec::with_capacity(messages.len());
640
641        use base64::{engine::general_purpose::STANDARD, Engine};
642
643        for message in messages {
644            let message_id = MessageId::new();
645            let body_base64 = STANDARD.encode(&message.body);
646
647            let broker_props = BrokerProperties {
648                message_id: message_id.to_string(),
649                session_id: message.session_id.as_ref().map(|s| s.to_string()),
650                time_to_live: message
651                    .time_to_live
652                    .as_ref()
653                    .map(|ttl| ttl.num_seconds() as u64),
654            };
655
656            batch_messages.push(ServiceBusMessageBody {
657                content_type: "application/octet-stream".to_string(),
658                body: body_base64,
659                broker_properties: broker_props,
660            });
661
662            message_ids.push(message_id);
663        }
664
665        // Build URL: {namespace}/{queue}/messages
666        let url = format!("{}/{}/messages", self.namespace_url, queue.as_str());
667
668        // Get auth token
669        let auth_token = self
670            .get_auth_token()
671            .await
672            .map_err(|e| e.to_queue_error())?;
673
674        // Send batch HTTP POST request with JSON array
675        let response = self
676            .http_client
677            .post(&url)
678            .header(header::AUTHORIZATION, auth_token)
679            .header(header::CONTENT_TYPE, "application/json")
680            .json(&batch_messages)
681            .send()
682            .await
683            .map_err(|e| {
684                AzureError::NetworkError(format!("Batch send HTTP request failed: {}", e))
685                    .to_queue_error()
686            })?;
687
688        // Check response status
689        match response.status() {
690            StatusCode::CREATED | StatusCode::OK => Ok(message_ids),
691            StatusCode::PAYLOAD_TOO_LARGE => Err(QueueError::BatchTooLarge {
692                size: messages.len(),
693                max_size: 100,
694            }),
695            StatusCode::TOO_MANY_REQUESTS => {
696                let retry_after = response
697                    .headers()
698                    .get("Retry-After")
699                    .and_then(|v| v.to_str().ok())
700                    .and_then(|s| s.parse::<u64>().ok())
701                    .unwrap_or(30);
702
703                Err(QueueError::ProviderError {
704                    provider: "AzureServiceBus".to_string(),
705                    code: "ThrottlingError".to_string(),
706                    message: format!("Request throttled, retry after {} seconds", retry_after),
707                })
708            }
709            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
710                let error_body = response.text().await.unwrap_or_default();
711                Err(QueueError::AuthenticationFailed {
712                    message: format!("Authentication failed: {}", error_body),
713                })
714            }
715            status => {
716                let error_body = response.text().await.unwrap_or_default();
717                Err(QueueError::ProviderError {
718                    provider: "AzureServiceBus".to_string(),
719                    code: status.as_str().to_string(),
720                    message: format!("Batch send failed: {}", error_body),
721                })
722            }
723        }
724    }
725
726    async fn receive_message(
727        &self,
728        queue: &QueueName,
729        timeout: Duration,
730    ) -> Result<Option<ReceivedMessage>, QueueError> {
731        // Azure Service Bus receive uses HTTP DELETE with peek-lock
732        // URL: {namespace}/{queue}/messages/head?timeout={seconds}
733        let url = format!(
734            "{}/{}/messages/head?timeout={}",
735            self.namespace_url,
736            queue.as_str(),
737            timeout.num_seconds()
738        );
739
740        // Get auth token
741        let auth_token = self
742            .get_auth_token()
743            .await
744            .map_err(|e| e.to_queue_error())?;
745
746        // Send HTTP DELETE request (peek-lock mode)
747        let response = self
748            .http_client
749            .delete(&url)
750            .header(header::AUTHORIZATION, auth_token)
751            .send()
752            .await
753            .map_err(|e| {
754                AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
755            })?;
756
757        // Check response status
758        match response.status() {
759            StatusCode::OK | StatusCode::CREATED => {
760                // Parse BrokerProperties from response header
761                let broker_props = response
762                    .headers()
763                    .get("BrokerProperties")
764                    .and_then(|v| v.to_str().ok())
765                    .and_then(|s| serde_json::from_str::<ReceivedBrokerProperties>(s).ok())
766                    .ok_or_else(|| QueueError::ProviderError {
767                        provider: "AzureServiceBus".to_string(),
768                        code: "InvalidResponse".to_string(),
769                        message: "Missing or invalid BrokerProperties header".to_string(),
770                    })?;
771
772                // Get message body (base64 encoded)
773                let body_base64 = response.text().await.map_err(|e| {
774                    AzureError::NetworkError(format!("Failed to read response body: {}", e))
775                        .to_queue_error()
776                })?;
777
778                // Decode base64 body
779                use base64::{engine::general_purpose::STANDARD, Engine};
780                let body =
781                    STANDARD
782                        .decode(&body_base64)
783                        .map_err(|e| QueueError::ProviderError {
784                            provider: "AzureServiceBus".to_string(),
785                            code: "DecodingError".to_string(),
786                            message: format!("Failed to decode message body: {}", e),
787                        })?;
788
789                // Parse enqueued time
790                let first_delivered_at =
791                    chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
792                        .map(|dt| Timestamp::from_datetime(dt.with_timezone(&chrono::Utc)))
793                        .unwrap_or_else(|_| Timestamp::now());
794
795                // Create receipt handle combining lock token and queue name
796                // Lock expires in 30 seconds by default (Azure Service Bus default)
797                let expires_at = Timestamp::from_datetime(Utc::now() + Duration::seconds(30));
798                let receipt_str = format!("{}::{}", broker_props.lock_token, queue.as_str());
799                let receipt = ReceiptHandle::new(
800                    receipt_str.clone(),
801                    expires_at,
802                    ProviderType::AzureServiceBus,
803                );
804
805                // Store lock token for later acknowledgment
806                self.lock_tokens.write().await.insert(
807                    receipt_str,
808                    (broker_props.lock_token.clone(), queue.as_str().to_string()),
809                );
810
811                // Parse Azure message ID
812                let message_id = MessageId::from_str(&broker_props.message_id)
813                    .unwrap_or_else(|_| MessageId::new());
814
815                // Create received message
816                let received_message = ReceivedMessage {
817                    message_id,
818                    body: bytes::Bytes::from(body),
819                    attributes: HashMap::new(),
820                    session_id: broker_props.session_id.map(SessionId::new).transpose()?,
821                    correlation_id: None,
822                    receipt_handle: receipt,
823                    delivery_count: broker_props.delivery_count,
824                    first_delivered_at,
825                    delivered_at: Timestamp::now(),
826                };
827
828                Ok(Some(received_message))
829            }
830            StatusCode::NO_CONTENT => {
831                // No messages available
832                Ok(None)
833            }
834            status => {
835                let error_body = response.text().await.unwrap_or_default();
836                Err(QueueError::ProviderError {
837                    provider: "AzureServiceBus".to_string(),
838                    code: status.as_str().to_string(),
839                    message: format!("Receive failed: {}", error_body),
840                })
841            }
842        }
843    }
844
845    async fn receive_messages(
846        &self,
847        queue: &QueueName,
848        max_messages: u32,
849        timeout: Duration,
850    ) -> Result<Vec<ReceivedMessage>, QueueError> {
851        // Azure Service Bus max batch receive is 32 messages
852        if max_messages > 32 {
853            return Err(QueueError::BatchTooLarge {
854                size: max_messages as usize,
855                max_size: 32,
856            });
857        }
858
859        if max_messages == 0 {
860            return Ok(Vec::new());
861        }
862
863        // Build URL with maxMessageCount parameter for batch receive
864        // {namespace}/{queue}/messages/head?timeout={seconds}&maxMessageCount={count}
865        let url = format!(
866            "{}/{}/messages/head?timeout={}&maxMessageCount={}",
867            self.namespace_url,
868            queue.as_str(),
869            timeout.num_seconds(),
870            max_messages
871        );
872
873        // Get auth token
874        let auth_token = self
875            .get_auth_token()
876            .await
877            .map_err(|e| e.to_queue_error())?;
878
879        // Receive messages using HTTP DELETE (PeekLock mode)
880        let response = self
881            .http_client
882            .delete(&url)
883            .header(header::AUTHORIZATION, auth_token)
884            .send()
885            .await
886            .map_err(|e| {
887                AzureError::NetworkError(format!("Batch receive HTTP request failed: {}", e))
888                    .to_queue_error()
889            })?;
890
891        // Parse response
892        match response.status() {
893            StatusCode::OK | StatusCode::CREATED => {
894                // Parse JSON array response
895                let messages_data: Vec<ServiceBusMessageResponse> =
896                    response.json().await.map_err(|e| {
897                        AzureError::SerializationError(format!(
898                            "Failed to parse batch receive response: {}",
899                            e
900                        ))
901                        .to_queue_error()
902                    })?;
903
904                let mut received_messages = Vec::with_capacity(messages_data.len());
905
906                use base64::{engine::general_purpose::STANDARD, Engine};
907
908                for msg_data in messages_data {
909                    let broker_props = msg_data.broker_properties;
910
911                    // Decode base64 body
912                    let body = STANDARD.decode(&msg_data.body).map_err(|e| {
913                        AzureError::SerializationError(format!(
914                            "Failed to decode message body: {}",
915                            e
916                        ))
917                        .to_queue_error()
918                    })?;
919
920                    // Parse enqueued time
921                    let enqueued_time =
922                        chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
923                            .map_err(|e| {
924                                AzureError::SerializationError(format!(
925                                    "Failed to parse enqueued time: {}",
926                                    e
927                                ))
928                                .to_queue_error()
929                            })?;
930                    let first_delivered_at =
931                        Timestamp::from_datetime(enqueued_time.with_timezone(&Utc));
932
933                    // Create receipt handle with lock expiration (30s default)
934                    let expires_at = Timestamp::from_datetime(Utc::now() + Duration::seconds(30));
935                    let receipt_str = format!("{}::{}", broker_props.lock_token, queue.as_str());
936                    let receipt = ReceiptHandle::new(
937                        receipt_str.clone(),
938                        expires_at,
939                        ProviderType::AzureServiceBus,
940                    );
941
942                    // Store lock token for acknowledgment
943                    self.lock_tokens.write().await.insert(
944                        receipt_str,
945                        (broker_props.lock_token.clone(), queue.as_str().to_string()),
946                    );
947
948                    // Parse Azure message ID
949                    let message_id = MessageId::from_str(&broker_props.message_id)
950                        .unwrap_or_else(|_| MessageId::new());
951
952                    // Create received message
953                    let received_message = ReceivedMessage {
954                        message_id,
955                        body: bytes::Bytes::from(body),
956                        attributes: HashMap::new(),
957                        session_id: broker_props.session_id.map(SessionId::new).transpose()?,
958                        correlation_id: None,
959                        receipt_handle: receipt,
960                        delivery_count: broker_props.delivery_count,
961                        first_delivered_at,
962                        delivered_at: Timestamp::now(),
963                    };
964
965                    received_messages.push(received_message);
966                }
967
968                Ok(received_messages)
969            }
970            StatusCode::NO_CONTENT => {
971                // No messages available
972                Ok(Vec::new())
973            }
974            StatusCode::TOO_MANY_REQUESTS => {
975                let retry_after = response
976                    .headers()
977                    .get("Retry-After")
978                    .and_then(|v| v.to_str().ok())
979                    .and_then(|s| s.parse::<u64>().ok())
980                    .unwrap_or(30);
981
982                Err(QueueError::ProviderError {
983                    provider: "AzureServiceBus".to_string(),
984                    code: "ThrottlingError".to_string(),
985                    message: format!("Request throttled, retry after {} seconds", retry_after),
986                })
987            }
988            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
989                let error_body = response.text().await.unwrap_or_default();
990                Err(QueueError::AuthenticationFailed {
991                    message: format!("Authentication failed: {}", error_body),
992                })
993            }
994            status => {
995                let error_body = response.text().await.unwrap_or_default();
996                Err(QueueError::ProviderError {
997                    provider: "AzureServiceBus".to_string(),
998                    code: status.as_str().to_string(),
999                    message: format!("Batch receive failed: {}", error_body),
1000                })
1001            }
1002        }
1003    }
1004
1005    async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1006        // Extract lock token and queue name from receipt handle
1007        let lock_tokens = self.lock_tokens.read().await;
1008        let (lock_token, queue_name) =
1009            lock_tokens
1010                .get(receipt.handle())
1011                .ok_or_else(|| QueueError::MessageNotFound {
1012                    receipt: receipt.handle().to_string(),
1013                })?;
1014
1015        // Azure Service Bus complete uses HTTP DELETE to {namespace}/{queue}/messages/{messageId}/{lockToken}
1016        let url = format!(
1017            "{}/{}/messages/head/{}",
1018            self.namespace_url,
1019            queue_name,
1020            urlencoding::encode(lock_token)
1021        );
1022
1023        // Get auth token
1024        let auth_token = self
1025            .get_auth_token()
1026            .await
1027            .map_err(|e| e.to_queue_error())?;
1028
1029        // Send HTTP DELETE request
1030        let response = self
1031            .http_client
1032            .delete(&url)
1033            .header(header::AUTHORIZATION, auth_token)
1034            .send()
1035            .await
1036            .map_err(|e| {
1037                AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1038            })?;
1039
1040        // Check response status
1041        match response.status() {
1042            StatusCode::OK | StatusCode::NO_CONTENT => {
1043                // Remove lock token from cache
1044                drop(lock_tokens);
1045                self.lock_tokens.write().await.remove(receipt.handle());
1046                Ok(())
1047            }
1048            StatusCode::GONE | StatusCode::NOT_FOUND => {
1049                // Lock expired or message already processed
1050                Err(QueueError::MessageNotFound {
1051                    receipt: receipt.handle().to_string(),
1052                })
1053            }
1054            status => {
1055                let error_body = response.text().await.unwrap_or_default();
1056                Err(QueueError::ProviderError {
1057                    provider: "AzureServiceBus".to_string(),
1058                    code: status.as_str().to_string(),
1059                    message: format!("Complete failed: {}", error_body),
1060                })
1061            }
1062        }
1063    }
1064
1065    async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1066        // Extract lock token and queue name from receipt handle
1067        let lock_tokens = self.lock_tokens.read().await;
1068        let (lock_token, queue_name) =
1069            lock_tokens
1070                .get(receipt.handle())
1071                .ok_or_else(|| QueueError::MessageNotFound {
1072                    receipt: receipt.handle().to_string(),
1073                })?;
1074
1075        // Azure Service Bus abandon uses HTTP PUT to {namespace}/{queue}/messages/{messageId}/{lockToken}
1076        // with empty body to unlock the message
1077        let url = format!(
1078            "{}/{}/messages/head/{}",
1079            self.namespace_url,
1080            queue_name,
1081            urlencoding::encode(lock_token)
1082        );
1083
1084        // Get auth token
1085        let auth_token = self
1086            .get_auth_token()
1087            .await
1088            .map_err(|e| e.to_queue_error())?;
1089
1090        // Send HTTP PUT request with empty body to abandon
1091        let response = self
1092            .http_client
1093            .put(&url)
1094            .header(header::AUTHORIZATION, auth_token)
1095            .header(header::CONTENT_LENGTH, "0")
1096            .send()
1097            .await
1098            .map_err(|e| {
1099                AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1100            })?;
1101
1102        // Check response status
1103        match response.status() {
1104            StatusCode::OK | StatusCode::NO_CONTENT => {
1105                // Remove lock token from cache
1106                drop(lock_tokens);
1107                self.lock_tokens.write().await.remove(receipt.handle());
1108                Ok(())
1109            }
1110            StatusCode::GONE | StatusCode::NOT_FOUND => {
1111                // Lock expired or message already processed
1112                Err(QueueError::MessageNotFound {
1113                    receipt: receipt.handle().to_string(),
1114                })
1115            }
1116            status => {
1117                let error_body = response.text().await.unwrap_or_default();
1118                Err(QueueError::ProviderError {
1119                    provider: "AzureServiceBus".to_string(),
1120                    code: status.as_str().to_string(),
1121                    message: format!("Abandon failed: {}", error_body),
1122                })
1123            }
1124        }
1125    }
1126
1127    async fn dead_letter_message(
1128        &self,
1129        receipt: &ReceiptHandle,
1130        reason: &str,
1131    ) -> Result<(), QueueError> {
1132        // Extract lock token and queue name from receipt handle
1133        let lock_tokens = self.lock_tokens.read().await;
1134        let (lock_token, queue_name) =
1135            lock_tokens
1136                .get(receipt.handle())
1137                .ok_or_else(|| QueueError::MessageNotFound {
1138                    receipt: receipt.handle().to_string(),
1139                })?;
1140
1141        // Azure Service Bus dead letter uses HTTP DELETE to {namespace}/{queue}/messages/{messageId}/{lockToken}
1142        // with custom properties in the DeadLetterReason header
1143        let url = format!(
1144            "{}/{}/messages/head/{}/$deadletter",
1145            self.namespace_url,
1146            queue_name,
1147            urlencoding::encode(lock_token)
1148        );
1149
1150        // Get auth token
1151        let auth_token = self
1152            .get_auth_token()
1153            .await
1154            .map_err(|e| e.to_queue_error())?;
1155
1156        // Build dead letter properties as JSON
1157        let properties = serde_json::json!({
1158            "DeadLetterReason": reason,
1159            "DeadLetterErrorDescription": "Message processing failed"
1160        });
1161
1162        // Send HTTP POST request to dead letter
1163        let response = self
1164            .http_client
1165            .post(&url)
1166            .header(header::AUTHORIZATION, auth_token)
1167            .header(header::CONTENT_TYPE, "application/json")
1168            .json(&properties)
1169            .send()
1170            .await
1171            .map_err(|e| {
1172                AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1173            })?;
1174
1175        // Check response status
1176        match response.status() {
1177            StatusCode::OK | StatusCode::NO_CONTENT | StatusCode::CREATED => {
1178                // Remove lock token from cache
1179                drop(lock_tokens);
1180                self.lock_tokens.write().await.remove(receipt.handle());
1181                Ok(())
1182            }
1183            StatusCode::GONE | StatusCode::NOT_FOUND => {
1184                // Lock expired or message already processed
1185                Err(QueueError::MessageNotFound {
1186                    receipt: receipt.handle().to_string(),
1187                })
1188            }
1189            status => {
1190                let error_body = response.text().await.unwrap_or_default();
1191                Err(QueueError::ProviderError {
1192                    provider: "AzureServiceBus".to_string(),
1193                    code: status.as_str().to_string(),
1194                    message: format!("Dead letter failed: {}", error_body),
1195                })
1196            }
1197        }
1198    }
1199
1200    async fn create_session_client(
1201        &self,
1202        _queue: &QueueName,
1203        _session_id: Option<SessionId>,
1204    ) -> Result<Box<dyn SessionProvider>, QueueError> {
1205        // TODO: Accept session and create session provider
1206        Err(QueueError::ProviderError {
1207            provider: "AzureServiceBus".to_string(),
1208            code: "NotImplemented".to_string(),
1209            message: "Azure Service Bus session client not yet implemented".to_string(),
1210        })
1211    }
1212
1213    fn provider_type(&self) -> ProviderType {
1214        ProviderType::AzureServiceBus
1215    }
1216
1217    fn supports_sessions(&self) -> SessionSupport {
1218        SessionSupport::Native
1219    }
1220
1221    fn supports_batching(&self) -> bool {
1222        true
1223    }
1224
1225    fn max_batch_size(&self) -> u32 {
1226        100 // Azure Service Bus max batch send
1227    }
1228}
1229
1230// ============================================================================
1231// Azure Session Provider
1232// ============================================================================
1233
1234/// Azure Service Bus session provider for ordered message processing
1235pub struct AzureSessionProvider {
1236    session_id: SessionId,
1237    #[allow(dead_code)] // Will be used in session implementation
1238    queue_name: QueueName,
1239    session_expires_at: Timestamp,
1240    // TODO: Add actual Azure session receiver
1241}
1242
1243impl AzureSessionProvider {
1244    /// Create new session provider
1245    pub fn new(session_id: SessionId, queue_name: QueueName, session_timeout: Duration) -> Self {
1246        let session_expires_at = Timestamp::from_datetime(Utc::now() + session_timeout);
1247
1248        Self {
1249            session_id,
1250            queue_name,
1251            session_expires_at,
1252        }
1253    }
1254}
1255
1256#[async_trait]
1257impl SessionProvider for AzureSessionProvider {
1258    async fn receive_message(
1259        &self,
1260        _timeout: Duration,
1261    ) -> Result<Option<ReceivedMessage>, QueueError> {
1262        // TODO: Implement session receive
1263        Err(QueueError::ProviderError {
1264            provider: "AzureServiceBus".to_string(),
1265            code: "NotImplemented".to_string(),
1266            message: "Azure Service Bus session receive not yet implemented".to_string(),
1267        })
1268    }
1269
1270    async fn complete_message(&self, _receipt: &ReceiptHandle) -> Result<(), QueueError> {
1271        // TODO: Implement session complete
1272        Err(QueueError::ProviderError {
1273            provider: "AzureServiceBus".to_string(),
1274            code: "NotImplemented".to_string(),
1275            message: "Azure Service Bus session complete not yet implemented".to_string(),
1276        })
1277    }
1278
1279    async fn abandon_message(&self, _receipt: &ReceiptHandle) -> Result<(), QueueError> {
1280        // TODO: Implement session abandon
1281        Err(QueueError::ProviderError {
1282            provider: "AzureServiceBus".to_string(),
1283            code: "NotImplemented".to_string(),
1284            message: "Azure Service Bus session abandon not yet implemented".to_string(),
1285        })
1286    }
1287
1288    async fn dead_letter_message(
1289        &self,
1290        _receipt: &ReceiptHandle,
1291        _reason: &str,
1292    ) -> Result<(), QueueError> {
1293        // TODO: Implement session dead letter
1294        Err(QueueError::ProviderError {
1295            provider: "AzureServiceBus".to_string(),
1296            code: "NotImplemented".to_string(),
1297            message: "Azure Service Bus session dead letter not yet implemented".to_string(),
1298        })
1299    }
1300
1301    async fn renew_session_lock(&self) -> Result<(), QueueError> {
1302        // TODO: Implement session lock renewal
1303        Err(QueueError::ProviderError {
1304            provider: "AzureServiceBus".to_string(),
1305            code: "NotImplemented".to_string(),
1306            message: "Azure Service Bus session lock renewal not yet implemented".to_string(),
1307        })
1308    }
1309
1310    async fn close_session(&self) -> Result<(), QueueError> {
1311        // TODO: Implement session close
1312        Ok(())
1313    }
1314
1315    fn session_id(&self) -> &SessionId {
1316        &self.session_id
1317    }
1318
1319    fn session_expires_at(&self) -> Timestamp {
1320        self.session_expires_at
1321    }
1322}
1323
1324// ============================================================================
1325// Internal Azure Types (Placeholders)
1326// ============================================================================
1327
1328/// Placeholder for Azure Service Bus sender
1329#[allow(dead_code)] // Placeholder struct for future implementation
1330#[derive(Debug)]
1331struct AzureSender {
1332    queue_name: QueueName,
1333}
1334
1335#[allow(dead_code)] // Placeholder impl for future implementation
1336impl AzureSender {
1337    fn new(queue_name: QueueName) -> Result<Self, AzureError> {
1338        Ok(Self { queue_name })
1339    }
1340}
1341
1342/// Placeholder for Azure Service Bus receiver
1343#[allow(dead_code)] // Placeholder struct for future implementation
1344#[derive(Debug)]
1345struct AzureReceiver {
1346    queue_name: QueueName,
1347}
1348
1349#[allow(dead_code)] // Placeholder impl for future implementation
1350impl AzureReceiver {
1351    fn new(queue_name: QueueName) -> Result<Self, AzureError> {
1352        Ok(Self { queue_name })
1353    }
1354}
1355
1356/// Placeholder for Azure Service Bus session receiver
1357#[allow(dead_code)] // Placeholder struct for future implementation
1358#[derive(Debug)]
1359struct AzureSessionReceiver {
1360    session_id: SessionId,
1361    queue_name: QueueName,
1362}
1363
1364#[allow(dead_code)] // Placeholder impl for future implementation
1365impl AzureSessionReceiver {
1366    fn new(session_id: SessionId, queue_name: QueueName) -> Result<Self, AzureError> {
1367        Ok(Self {
1368            session_id,
1369            queue_name,
1370        })
1371    }
1372}