1use crate::client::{QueueProvider, SessionProvider};
50use crate::error::{ConfigurationError, QueueError, SerializationError};
51use crate::message::{
52 Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
53};
54use crate::provider::{AzureServiceBusConfig, ProviderType, SessionSupport};
55use async_trait::async_trait;
56use azure_core::auth::TokenCredential;
57use azure_identity::{
58 ClientSecretCredential, TokenCredentialOptions, VirtualMachineManagedIdentityCredential,
59};
60use chrono::{Duration, Utc};
61use reqwest::{header, Client as HttpClient, StatusCode};
62use serde::{Deserialize, Serialize};
63use std::collections::HashMap;
64use std::fmt;
65use std::str::FromStr;
66use std::sync::Arc;
67use tokio::sync::RwLock;
68
69#[cfg(test)]
70#[path = "azure_tests.rs"]
71mod tests;
72
73#[derive(Clone, Serialize, Deserialize)]
79pub enum AzureAuthMethod {
80 ConnectionString,
82 ManagedIdentity,
84 ClientSecret {
86 tenant_id: String,
87 client_id: String,
88 client_secret: String,
89 },
90 DefaultCredential,
92}
93
94impl fmt::Debug for AzureAuthMethod {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::ConnectionString => f.debug_struct("ConnectionString").finish(),
98 Self::ManagedIdentity => f.debug_struct("ManagedIdentity").finish(),
99 Self::ClientSecret {
100 tenant_id,
101 client_id,
102 ..
103 } => f
104 .debug_struct("ClientSecret")
105 .field("tenant_id", tenant_id)
106 .field("client_id", client_id)
107 .field("client_secret", &"<REDACTED>")
108 .finish(),
109 Self::DefaultCredential => f.debug_struct("DefaultCredential").finish(),
110 }
111 }
112}
113
114impl fmt::Display for AzureAuthMethod {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 Self::ConnectionString => write!(f, "ConnectionString"),
118 Self::ManagedIdentity => write!(f, "ManagedIdentity"),
119 Self::ClientSecret { .. } => write!(f, "ClientSecret"),
120 Self::DefaultCredential => write!(f, "DefaultCredential"),
121 }
122 }
123}
124
125#[derive(Debug, thiserror::Error)]
131pub enum AzureError {
132 #[error("Authentication failed: {0}")]
133 AuthenticationError(String),
134
135 #[error("Network error: {0}")]
136 NetworkError(String),
137
138 #[error("Service Bus error: {0}")]
139 ServiceBusError(String),
140
141 #[error("Message lock lost: {0}")]
142 MessageLockLost(String),
143
144 #[error("Session lock lost: {0}")]
145 SessionLockLost(String),
146
147 #[error("Invalid configuration: {0}")]
148 ConfigurationError(String),
149
150 #[error("Serialization error: {0}")]
151 SerializationError(String),
152}
153
154impl AzureError {
155 pub fn is_transient(&self) -> bool {
157 match self {
158 Self::AuthenticationError(_) => false,
159 Self::NetworkError(_) => true,
160 Self::ServiceBusError(_) => true, Self::MessageLockLost(_) => false,
162 Self::SessionLockLost(_) => false,
163 Self::ConfigurationError(_) => false,
164 Self::SerializationError(_) => false,
165 }
166 }
167
168 pub fn to_queue_error(self) -> QueueError {
170 match self {
171 Self::AuthenticationError(msg) => QueueError::AuthenticationFailed { message: msg },
172 Self::NetworkError(msg) => QueueError::ConnectionFailed { message: msg },
173 Self::ServiceBusError(msg) => QueueError::ProviderError {
174 provider: "AzureServiceBus".to_string(),
175 code: "ServiceBusError".to_string(),
176 message: msg,
177 },
178 Self::MessageLockLost(msg) => QueueError::MessageNotFound { receipt: msg },
179 Self::SessionLockLost(session_id) => QueueError::SessionNotFound { session_id },
180 Self::ConfigurationError(msg) => {
181 QueueError::ConfigurationError(ConfigurationError::Invalid { message: msg })
182 }
183 Self::SerializationError(msg) => QueueError::SerializationError(
184 SerializationError::JsonError(serde_json::Error::io(std::io::Error::new(
185 std::io::ErrorKind::InvalidData,
186 msg,
187 ))),
188 ),
189 }
190 }
191}
192
193pub struct AzureServiceBusProvider {
207 config: AzureServiceBusConfig,
208 http_client: HttpClient,
209 namespace_url: String,
210 credential: Option<Arc<dyn TokenCredential + Send + Sync>>,
211 lock_tokens: Arc<RwLock<HashMap<String, (String, String)>>>,
213}
214
215impl fmt::Debug for AzureServiceBusProvider {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 f.debug_struct("AzureServiceBusProvider")
218 .field("config", &self.config)
219 .field("namespace_url", &self.namespace_url)
220 .field(
221 "credential",
222 &self.credential.as_ref().map(|_| "<TokenCredential>"),
223 )
224 .field("lock_tokens", &self.lock_tokens)
225 .finish()
226 }
227}
228
229impl AzureServiceBusProvider {
230 pub async fn new(config: AzureServiceBusConfig) -> Result<Self, AzureError> {
264 Self::validate_config(&config)?;
266
267 let (namespace_url, credential) = match &config.auth_method {
269 AzureAuthMethod::ConnectionString => {
270 let conn_str = config.connection_string.as_ref().ok_or_else(|| {
271 AzureError::ConfigurationError(
272 "Connection string required for ConnectionString auth".to_string(),
273 )
274 })?;
275
276 let namespace_url = Self::parse_connection_string_endpoint(conn_str)?;
277 (namespace_url, None)
278 }
279 AzureAuthMethod::ManagedIdentity => {
280 let namespace = config.namespace.as_ref().ok_or_else(|| {
281 AzureError::ConfigurationError(
282 "Namespace required for ManagedIdentity auth".to_string(),
283 )
284 })?;
285
286 let credential =
287 VirtualMachineManagedIdentityCredential::new(TokenCredentialOptions::default());
288 let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
289 (
290 namespace_url,
291 Some(Arc::new(credential) as Arc<dyn TokenCredential + Send + Sync>),
292 )
293 }
294 AzureAuthMethod::ClientSecret {
295 ref tenant_id,
296 ref client_id,
297 ref client_secret,
298 } => {
299 let namespace = config.namespace.as_ref().ok_or_else(|| {
300 AzureError::ConfigurationError(
301 "Namespace required for ClientSecret auth".to_string(),
302 )
303 })?;
304
305 let http_client = azure_core::new_http_client();
307 let authority_host = azure_core::Url::parse("https://login.microsoftonline.com")
308 .map_err(|e| {
309 AzureError::ConfigurationError(format!("Invalid authority URL: {}", e))
310 })?;
311
312 let credential = ClientSecretCredential::new(
313 http_client,
314 authority_host,
315 tenant_id.clone(),
316 client_id.clone(),
317 client_secret.clone(),
318 );
319 let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
320 (
321 namespace_url,
322 Some(Arc::new(credential) as Arc<dyn TokenCredential + Send + Sync>),
323 )
324 }
325 AzureAuthMethod::DefaultCredential => {
326 let namespace = config.namespace.as_ref().ok_or_else(|| {
327 AzureError::ConfigurationError(
328 "Namespace required for DefaultCredential auth".to_string(),
329 )
330 })?;
331
332 let credential =
334 VirtualMachineManagedIdentityCredential::new(TokenCredentialOptions::default());
335 let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
336 (
337 namespace_url,
338 Some(Arc::new(credential) as Arc<dyn TokenCredential + Send + Sync>),
339 )
340 }
341 };
342
343 let http_client = HttpClient::builder()
345 .timeout(std::time::Duration::from_secs(30))
346 .build()
347 .map_err(|e| {
348 AzureError::NetworkError(format!("Failed to create HTTP client: {}", e))
349 })?;
350
351 Ok(Self {
352 config,
353 http_client,
354 namespace_url,
355 credential,
356 lock_tokens: Arc::new(RwLock::new(HashMap::new())),
357 })
358 }
359
360 fn parse_connection_string_endpoint(conn_str: &str) -> Result<String, AzureError> {
362 for part in conn_str.split(';') {
363 if let Some(endpoint) = part.strip_prefix("Endpoint=") {
364 return Ok(endpoint.trim_end_matches('/').to_string());
365 }
366 }
367 Err(AzureError::ConfigurationError(
368 "Invalid connection string: missing Endpoint".to_string(),
369 ))
370 }
371
372 fn validate_config(config: &AzureServiceBusConfig) -> Result<(), AzureError> {
374 match &config.auth_method {
375 AzureAuthMethod::ConnectionString => {
376 if config.connection_string.is_none() {
377 return Err(AzureError::ConfigurationError(
378 "Connection string required for ConnectionString auth method".to_string(),
379 ));
380 }
381 }
382 AzureAuthMethod::ManagedIdentity | AzureAuthMethod::DefaultCredential => {
383 if config.namespace.is_none() {
384 return Err(AzureError::ConfigurationError(
385 "Namespace required for ManagedIdentity/DefaultCredential auth".to_string(),
386 ));
387 }
388 }
389 AzureAuthMethod::ClientSecret {
390 tenant_id,
391 client_id,
392 client_secret,
393 } => {
394 if config.namespace.is_none() {
395 return Err(AzureError::ConfigurationError(
396 "Namespace required for ClientSecret auth".to_string(),
397 ));
398 }
399 if tenant_id.is_empty() || client_id.is_empty() || client_secret.is_empty() {
400 return Err(AzureError::ConfigurationError(
401 "Tenant ID, Client ID, and Client Secret required for ClientSecret auth"
402 .to_string(),
403 ));
404 }
405 }
406 }
407
408 Ok(())
409 }
410
411 async fn get_auth_token(&self) -> Result<String, AzureError> {
413 match &self.credential {
414 Some(cred) => {
415 let scopes = &["https://servicebus.azure.net/.default"];
416 let token = cred.get_token(scopes).await.map_err(|e| {
417 AzureError::AuthenticationError(format!("Failed to get token: {}", e))
418 })?;
419 Ok(token.token.secret().to_string())
420 }
421 None => {
422 self.get_sas_token()
424 }
425 }
426 }
427
428 fn get_sas_token(&self) -> Result<String, AzureError> {
430 let conn_str = self.config.connection_string.as_ref().ok_or_else(|| {
431 AzureError::AuthenticationError("No connection string available".to_string())
432 })?;
433
434 let mut key_name = None;
436 let mut key = None;
437
438 for part in conn_str.split(';') {
439 if let Some(value) = part.strip_prefix("SharedAccessKeyName=") {
440 key_name = Some(value.to_string());
441 } else if let Some(value) = part.strip_prefix("SharedAccessKey=") {
442 key = Some(value.to_string());
443 }
444 }
445
446 let key_name = key_name.ok_or_else(|| {
447 AzureError::AuthenticationError(
448 "Missing SharedAccessKeyName in connection string".to_string(),
449 )
450 })?;
451 let key = key.ok_or_else(|| {
452 AzureError::AuthenticationError(
453 "Missing SharedAccessKey in connection string".to_string(),
454 )
455 })?;
456
457 let expiry = (Utc::now() + Duration::hours(1)).timestamp();
459 let resource = self.namespace_url.to_string();
460 let string_to_sign = format!("{}\n{}", urlencoding::encode(&resource), expiry);
461
462 use hmac::{Hmac, Mac};
463 use sha2::Sha256;
464
465 type HmacSha256 = Hmac<Sha256>;
466
467 use base64::{engine::general_purpose::STANDARD, Engine};
468
469 let key_bytes = STANDARD.decode(&key).map_err(|e| {
470 AzureError::AuthenticationError(format!("Invalid SharedAccessKey: {}", e))
471 })?;
472
473 let mut mac = HmacSha256::new_from_slice(&key_bytes).map_err(|e| {
474 AzureError::AuthenticationError(format!("Failed to create HMAC: {}", e))
475 })?;
476 mac.update(string_to_sign.as_bytes());
477 let signature = STANDARD.encode(mac.finalize().into_bytes());
478
479 let sas = format!(
480 "SharedAccessSignature sr={}&sig={}&se={}&skn={}",
481 urlencoding::encode(&resource),
482 urlencoding::encode(&signature),
483 expiry,
484 urlencoding::encode(&key_name)
485 );
486
487 Ok(sas)
488 }
489}
490
491#[derive(Debug, Serialize, Deserialize)]
497struct ServiceBusMessageBody {
498 #[serde(rename = "ContentType")]
499 content_type: String,
500 #[serde(rename = "Body")]
501 body: String, #[serde(rename = "BrokerProperties")]
503 broker_properties: BrokerProperties,
504}
505
506#[derive(Debug, Serialize, Deserialize)]
507struct BrokerProperties {
508 #[serde(rename = "MessageId")]
509 message_id: String,
510 #[serde(rename = "SessionId", skip_serializing_if = "Option::is_none")]
511 session_id: Option<String>,
512 #[serde(rename = "TimeToLive", skip_serializing_if = "Option::is_none")]
513 time_to_live: Option<u64>,
514}
515
516#[derive(Debug, Deserialize)]
518struct ServiceBusMessageResponse {
519 #[serde(rename = "Body")]
520 body: String,
521 #[serde(rename = "BrokerProperties")]
522 broker_properties: ReceivedBrokerProperties,
523}
524
525#[allow(dead_code)] #[derive(Debug, Deserialize)]
527struct ReceivedServiceBusMessage {
528 #[serde(rename = "Body")]
529 body: String,
530 #[serde(rename = "BrokerProperties")]
531 broker_properties: ReceivedBrokerProperties,
532}
533
534#[allow(dead_code)] #[derive(Debug, Deserialize)]
536struct ReceivedBrokerProperties {
537 #[serde(rename = "MessageId")]
538 message_id: String,
539 #[serde(rename = "SessionId")]
540 session_id: Option<String>,
541 #[serde(rename = "LockToken")]
542 lock_token: String,
543 #[serde(rename = "DeliveryCount")]
544 delivery_count: u32,
545 #[serde(rename = "EnqueuedTimeUtc")]
546 enqueued_time_utc: String,
547}
548
549#[async_trait]
554impl QueueProvider for AzureServiceBusProvider {
555 async fn send_message(
556 &self,
557 queue: &QueueName,
558 message: &Message,
559 ) -> Result<MessageId, QueueError> {
560 let message_id = MessageId::new();
562
563 use base64::{engine::general_purpose::STANDARD, Engine};
565 let body_base64 = STANDARD.encode(&message.body);
566
567 let broker_props = BrokerProperties {
569 message_id: message_id.to_string(),
570 session_id: message.session_id.as_ref().map(|s| s.to_string()),
571 time_to_live: message
572 .time_to_live
573 .as_ref()
574 .map(|ttl| ttl.num_seconds() as u64),
575 };
576
577 let url = format!("{}/{}/messages", self.namespace_url, queue.as_str());
579
580 let auth_token = self
582 .get_auth_token()
583 .await
584 .map_err(|e| e.to_queue_error())?;
585
586 let response = self
588 .http_client
589 .post(&url)
590 .header(header::AUTHORIZATION, auth_token)
591 .header(
592 header::CONTENT_TYPE,
593 "application/atom+xml;type=entry;charset=utf-8",
594 )
595 .header(
596 "BrokerProperties",
597 serde_json::to_string(&broker_props).unwrap(),
598 )
599 .body(body_base64)
600 .send()
601 .await
602 .map_err(|e| {
603 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
604 })?;
605
606 match response.status() {
608 StatusCode::CREATED | StatusCode::OK => Ok(message_id),
609 status => {
610 let error_body = response.text().await.unwrap_or_default();
611 Err(QueueError::ProviderError {
612 provider: "AzureServiceBus".to_string(),
613 code: status.as_str().to_string(),
614 message: format!("Send failed: {}", error_body),
615 })
616 }
617 }
618 }
619
620 async fn send_messages(
621 &self,
622 queue: &QueueName,
623 messages: &[Message],
624 ) -> Result<Vec<MessageId>, QueueError> {
625 if messages.len() > 100 {
627 return Err(QueueError::BatchTooLarge {
628 size: messages.len(),
629 max_size: 100,
630 });
631 }
632
633 if messages.is_empty() {
634 return Ok(Vec::new());
635 }
636
637 let mut batch_messages = Vec::with_capacity(messages.len());
639 let mut message_ids = Vec::with_capacity(messages.len());
640
641 use base64::{engine::general_purpose::STANDARD, Engine};
642
643 for message in messages {
644 let message_id = MessageId::new();
645 let body_base64 = STANDARD.encode(&message.body);
646
647 let broker_props = BrokerProperties {
648 message_id: message_id.to_string(),
649 session_id: message.session_id.as_ref().map(|s| s.to_string()),
650 time_to_live: message
651 .time_to_live
652 .as_ref()
653 .map(|ttl| ttl.num_seconds() as u64),
654 };
655
656 batch_messages.push(ServiceBusMessageBody {
657 content_type: "application/octet-stream".to_string(),
658 body: body_base64,
659 broker_properties: broker_props,
660 });
661
662 message_ids.push(message_id);
663 }
664
665 let url = format!("{}/{}/messages", self.namespace_url, queue.as_str());
667
668 let auth_token = self
670 .get_auth_token()
671 .await
672 .map_err(|e| e.to_queue_error())?;
673
674 let response = self
676 .http_client
677 .post(&url)
678 .header(header::AUTHORIZATION, auth_token)
679 .header(header::CONTENT_TYPE, "application/json")
680 .json(&batch_messages)
681 .send()
682 .await
683 .map_err(|e| {
684 AzureError::NetworkError(format!("Batch send HTTP request failed: {}", e))
685 .to_queue_error()
686 })?;
687
688 match response.status() {
690 StatusCode::CREATED | StatusCode::OK => Ok(message_ids),
691 StatusCode::PAYLOAD_TOO_LARGE => Err(QueueError::BatchTooLarge {
692 size: messages.len(),
693 max_size: 100,
694 }),
695 StatusCode::TOO_MANY_REQUESTS => {
696 let retry_after = response
697 .headers()
698 .get("Retry-After")
699 .and_then(|v| v.to_str().ok())
700 .and_then(|s| s.parse::<u64>().ok())
701 .unwrap_or(30);
702
703 Err(QueueError::ProviderError {
704 provider: "AzureServiceBus".to_string(),
705 code: "ThrottlingError".to_string(),
706 message: format!("Request throttled, retry after {} seconds", retry_after),
707 })
708 }
709 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
710 let error_body = response.text().await.unwrap_or_default();
711 Err(QueueError::AuthenticationFailed {
712 message: format!("Authentication failed: {}", error_body),
713 })
714 }
715 status => {
716 let error_body = response.text().await.unwrap_or_default();
717 Err(QueueError::ProviderError {
718 provider: "AzureServiceBus".to_string(),
719 code: status.as_str().to_string(),
720 message: format!("Batch send failed: {}", error_body),
721 })
722 }
723 }
724 }
725
726 async fn receive_message(
727 &self,
728 queue: &QueueName,
729 timeout: Duration,
730 ) -> Result<Option<ReceivedMessage>, QueueError> {
731 let url = format!(
734 "{}/{}/messages/head?timeout={}",
735 self.namespace_url,
736 queue.as_str(),
737 timeout.num_seconds()
738 );
739
740 let auth_token = self
742 .get_auth_token()
743 .await
744 .map_err(|e| e.to_queue_error())?;
745
746 let response = self
748 .http_client
749 .delete(&url)
750 .header(header::AUTHORIZATION, auth_token)
751 .send()
752 .await
753 .map_err(|e| {
754 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
755 })?;
756
757 match response.status() {
759 StatusCode::OK | StatusCode::CREATED => {
760 let broker_props = response
762 .headers()
763 .get("BrokerProperties")
764 .and_then(|v| v.to_str().ok())
765 .and_then(|s| serde_json::from_str::<ReceivedBrokerProperties>(s).ok())
766 .ok_or_else(|| QueueError::ProviderError {
767 provider: "AzureServiceBus".to_string(),
768 code: "InvalidResponse".to_string(),
769 message: "Missing or invalid BrokerProperties header".to_string(),
770 })?;
771
772 let body_base64 = response.text().await.map_err(|e| {
774 AzureError::NetworkError(format!("Failed to read response body: {}", e))
775 .to_queue_error()
776 })?;
777
778 use base64::{engine::general_purpose::STANDARD, Engine};
780 let body =
781 STANDARD
782 .decode(&body_base64)
783 .map_err(|e| QueueError::ProviderError {
784 provider: "AzureServiceBus".to_string(),
785 code: "DecodingError".to_string(),
786 message: format!("Failed to decode message body: {}", e),
787 })?;
788
789 let first_delivered_at =
791 chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
792 .map(|dt| Timestamp::from_datetime(dt.with_timezone(&chrono::Utc)))
793 .unwrap_or_else(|_| Timestamp::now());
794
795 let expires_at = Timestamp::from_datetime(Utc::now() + Duration::seconds(30));
798 let receipt_str = format!("{}::{}", broker_props.lock_token, queue.as_str());
799 let receipt = ReceiptHandle::new(
800 receipt_str.clone(),
801 expires_at,
802 ProviderType::AzureServiceBus,
803 );
804
805 self.lock_tokens.write().await.insert(
807 receipt_str,
808 (broker_props.lock_token.clone(), queue.as_str().to_string()),
809 );
810
811 let message_id = MessageId::from_str(&broker_props.message_id)
813 .unwrap_or_else(|_| MessageId::new());
814
815 let received_message = ReceivedMessage {
817 message_id,
818 body: bytes::Bytes::from(body),
819 attributes: HashMap::new(),
820 session_id: broker_props.session_id.map(SessionId::new).transpose()?,
821 correlation_id: None,
822 receipt_handle: receipt,
823 delivery_count: broker_props.delivery_count,
824 first_delivered_at,
825 delivered_at: Timestamp::now(),
826 };
827
828 Ok(Some(received_message))
829 }
830 StatusCode::NO_CONTENT => {
831 Ok(None)
833 }
834 status => {
835 let error_body = response.text().await.unwrap_or_default();
836 Err(QueueError::ProviderError {
837 provider: "AzureServiceBus".to_string(),
838 code: status.as_str().to_string(),
839 message: format!("Receive failed: {}", error_body),
840 })
841 }
842 }
843 }
844
845 async fn receive_messages(
846 &self,
847 queue: &QueueName,
848 max_messages: u32,
849 timeout: Duration,
850 ) -> Result<Vec<ReceivedMessage>, QueueError> {
851 if max_messages > 32 {
853 return Err(QueueError::BatchTooLarge {
854 size: max_messages as usize,
855 max_size: 32,
856 });
857 }
858
859 if max_messages == 0 {
860 return Ok(Vec::new());
861 }
862
863 let url = format!(
866 "{}/{}/messages/head?timeout={}&maxMessageCount={}",
867 self.namespace_url,
868 queue.as_str(),
869 timeout.num_seconds(),
870 max_messages
871 );
872
873 let auth_token = self
875 .get_auth_token()
876 .await
877 .map_err(|e| e.to_queue_error())?;
878
879 let response = self
881 .http_client
882 .delete(&url)
883 .header(header::AUTHORIZATION, auth_token)
884 .send()
885 .await
886 .map_err(|e| {
887 AzureError::NetworkError(format!("Batch receive HTTP request failed: {}", e))
888 .to_queue_error()
889 })?;
890
891 match response.status() {
893 StatusCode::OK | StatusCode::CREATED => {
894 let messages_data: Vec<ServiceBusMessageResponse> =
896 response.json().await.map_err(|e| {
897 AzureError::SerializationError(format!(
898 "Failed to parse batch receive response: {}",
899 e
900 ))
901 .to_queue_error()
902 })?;
903
904 let mut received_messages = Vec::with_capacity(messages_data.len());
905
906 use base64::{engine::general_purpose::STANDARD, Engine};
907
908 for msg_data in messages_data {
909 let broker_props = msg_data.broker_properties;
910
911 let body = STANDARD.decode(&msg_data.body).map_err(|e| {
913 AzureError::SerializationError(format!(
914 "Failed to decode message body: {}",
915 e
916 ))
917 .to_queue_error()
918 })?;
919
920 let enqueued_time =
922 chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
923 .map_err(|e| {
924 AzureError::SerializationError(format!(
925 "Failed to parse enqueued time: {}",
926 e
927 ))
928 .to_queue_error()
929 })?;
930 let first_delivered_at =
931 Timestamp::from_datetime(enqueued_time.with_timezone(&Utc));
932
933 let expires_at = Timestamp::from_datetime(Utc::now() + Duration::seconds(30));
935 let receipt_str = format!("{}::{}", broker_props.lock_token, queue.as_str());
936 let receipt = ReceiptHandle::new(
937 receipt_str.clone(),
938 expires_at,
939 ProviderType::AzureServiceBus,
940 );
941
942 self.lock_tokens.write().await.insert(
944 receipt_str,
945 (broker_props.lock_token.clone(), queue.as_str().to_string()),
946 );
947
948 let message_id = MessageId::from_str(&broker_props.message_id)
950 .unwrap_or_else(|_| MessageId::new());
951
952 let received_message = ReceivedMessage {
954 message_id,
955 body: bytes::Bytes::from(body),
956 attributes: HashMap::new(),
957 session_id: broker_props.session_id.map(SessionId::new).transpose()?,
958 correlation_id: None,
959 receipt_handle: receipt,
960 delivery_count: broker_props.delivery_count,
961 first_delivered_at,
962 delivered_at: Timestamp::now(),
963 };
964
965 received_messages.push(received_message);
966 }
967
968 Ok(received_messages)
969 }
970 StatusCode::NO_CONTENT => {
971 Ok(Vec::new())
973 }
974 StatusCode::TOO_MANY_REQUESTS => {
975 let retry_after = response
976 .headers()
977 .get("Retry-After")
978 .and_then(|v| v.to_str().ok())
979 .and_then(|s| s.parse::<u64>().ok())
980 .unwrap_or(30);
981
982 Err(QueueError::ProviderError {
983 provider: "AzureServiceBus".to_string(),
984 code: "ThrottlingError".to_string(),
985 message: format!("Request throttled, retry after {} seconds", retry_after),
986 })
987 }
988 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
989 let error_body = response.text().await.unwrap_or_default();
990 Err(QueueError::AuthenticationFailed {
991 message: format!("Authentication failed: {}", error_body),
992 })
993 }
994 status => {
995 let error_body = response.text().await.unwrap_or_default();
996 Err(QueueError::ProviderError {
997 provider: "AzureServiceBus".to_string(),
998 code: status.as_str().to_string(),
999 message: format!("Batch receive failed: {}", error_body),
1000 })
1001 }
1002 }
1003 }
1004
1005 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1006 let lock_tokens = self.lock_tokens.read().await;
1008 let (lock_token, queue_name) =
1009 lock_tokens
1010 .get(receipt.handle())
1011 .ok_or_else(|| QueueError::MessageNotFound {
1012 receipt: receipt.handle().to_string(),
1013 })?;
1014
1015 let url = format!(
1017 "{}/{}/messages/head/{}",
1018 self.namespace_url,
1019 queue_name,
1020 urlencoding::encode(lock_token)
1021 );
1022
1023 let auth_token = self
1025 .get_auth_token()
1026 .await
1027 .map_err(|e| e.to_queue_error())?;
1028
1029 let response = self
1031 .http_client
1032 .delete(&url)
1033 .header(header::AUTHORIZATION, auth_token)
1034 .send()
1035 .await
1036 .map_err(|e| {
1037 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1038 })?;
1039
1040 match response.status() {
1042 StatusCode::OK | StatusCode::NO_CONTENT => {
1043 drop(lock_tokens);
1045 self.lock_tokens.write().await.remove(receipt.handle());
1046 Ok(())
1047 }
1048 StatusCode::GONE | StatusCode::NOT_FOUND => {
1049 Err(QueueError::MessageNotFound {
1051 receipt: receipt.handle().to_string(),
1052 })
1053 }
1054 status => {
1055 let error_body = response.text().await.unwrap_or_default();
1056 Err(QueueError::ProviderError {
1057 provider: "AzureServiceBus".to_string(),
1058 code: status.as_str().to_string(),
1059 message: format!("Complete failed: {}", error_body),
1060 })
1061 }
1062 }
1063 }
1064
1065 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1066 let lock_tokens = self.lock_tokens.read().await;
1068 let (lock_token, queue_name) =
1069 lock_tokens
1070 .get(receipt.handle())
1071 .ok_or_else(|| QueueError::MessageNotFound {
1072 receipt: receipt.handle().to_string(),
1073 })?;
1074
1075 let url = format!(
1078 "{}/{}/messages/head/{}",
1079 self.namespace_url,
1080 queue_name,
1081 urlencoding::encode(lock_token)
1082 );
1083
1084 let auth_token = self
1086 .get_auth_token()
1087 .await
1088 .map_err(|e| e.to_queue_error())?;
1089
1090 let response = self
1092 .http_client
1093 .put(&url)
1094 .header(header::AUTHORIZATION, auth_token)
1095 .header(header::CONTENT_LENGTH, "0")
1096 .send()
1097 .await
1098 .map_err(|e| {
1099 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1100 })?;
1101
1102 match response.status() {
1104 StatusCode::OK | StatusCode::NO_CONTENT => {
1105 drop(lock_tokens);
1107 self.lock_tokens.write().await.remove(receipt.handle());
1108 Ok(())
1109 }
1110 StatusCode::GONE | StatusCode::NOT_FOUND => {
1111 Err(QueueError::MessageNotFound {
1113 receipt: receipt.handle().to_string(),
1114 })
1115 }
1116 status => {
1117 let error_body = response.text().await.unwrap_or_default();
1118 Err(QueueError::ProviderError {
1119 provider: "AzureServiceBus".to_string(),
1120 code: status.as_str().to_string(),
1121 message: format!("Abandon failed: {}", error_body),
1122 })
1123 }
1124 }
1125 }
1126
1127 async fn dead_letter_message(
1128 &self,
1129 receipt: &ReceiptHandle,
1130 reason: &str,
1131 ) -> Result<(), QueueError> {
1132 let lock_tokens = self.lock_tokens.read().await;
1134 let (lock_token, queue_name) =
1135 lock_tokens
1136 .get(receipt.handle())
1137 .ok_or_else(|| QueueError::MessageNotFound {
1138 receipt: receipt.handle().to_string(),
1139 })?;
1140
1141 let url = format!(
1144 "{}/{}/messages/head/{}/$deadletter",
1145 self.namespace_url,
1146 queue_name,
1147 urlencoding::encode(lock_token)
1148 );
1149
1150 let auth_token = self
1152 .get_auth_token()
1153 .await
1154 .map_err(|e| e.to_queue_error())?;
1155
1156 let properties = serde_json::json!({
1158 "DeadLetterReason": reason,
1159 "DeadLetterErrorDescription": "Message processing failed"
1160 });
1161
1162 let response = self
1164 .http_client
1165 .post(&url)
1166 .header(header::AUTHORIZATION, auth_token)
1167 .header(header::CONTENT_TYPE, "application/json")
1168 .json(&properties)
1169 .send()
1170 .await
1171 .map_err(|e| {
1172 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1173 })?;
1174
1175 match response.status() {
1177 StatusCode::OK | StatusCode::NO_CONTENT | StatusCode::CREATED => {
1178 drop(lock_tokens);
1180 self.lock_tokens.write().await.remove(receipt.handle());
1181 Ok(())
1182 }
1183 StatusCode::GONE | StatusCode::NOT_FOUND => {
1184 Err(QueueError::MessageNotFound {
1186 receipt: receipt.handle().to_string(),
1187 })
1188 }
1189 status => {
1190 let error_body = response.text().await.unwrap_or_default();
1191 Err(QueueError::ProviderError {
1192 provider: "AzureServiceBus".to_string(),
1193 code: status.as_str().to_string(),
1194 message: format!("Dead letter failed: {}", error_body),
1195 })
1196 }
1197 }
1198 }
1199
1200 async fn create_session_client(
1201 &self,
1202 _queue: &QueueName,
1203 _session_id: Option<SessionId>,
1204 ) -> Result<Box<dyn SessionProvider>, QueueError> {
1205 Err(QueueError::ProviderError {
1207 provider: "AzureServiceBus".to_string(),
1208 code: "NotImplemented".to_string(),
1209 message: "Azure Service Bus session client not yet implemented".to_string(),
1210 })
1211 }
1212
1213 fn provider_type(&self) -> ProviderType {
1214 ProviderType::AzureServiceBus
1215 }
1216
1217 fn supports_sessions(&self) -> SessionSupport {
1218 SessionSupport::Native
1219 }
1220
1221 fn supports_batching(&self) -> bool {
1222 true
1223 }
1224
1225 fn max_batch_size(&self) -> u32 {
1226 100 }
1228}
1229
1230pub struct AzureSessionProvider {
1236 session_id: SessionId,
1237 #[allow(dead_code)] queue_name: QueueName,
1239 session_expires_at: Timestamp,
1240 }
1242
1243impl AzureSessionProvider {
1244 pub fn new(session_id: SessionId, queue_name: QueueName, session_timeout: Duration) -> Self {
1246 let session_expires_at = Timestamp::from_datetime(Utc::now() + session_timeout);
1247
1248 Self {
1249 session_id,
1250 queue_name,
1251 session_expires_at,
1252 }
1253 }
1254}
1255
1256#[async_trait]
1257impl SessionProvider for AzureSessionProvider {
1258 async fn receive_message(
1259 &self,
1260 _timeout: Duration,
1261 ) -> Result<Option<ReceivedMessage>, QueueError> {
1262 Err(QueueError::ProviderError {
1264 provider: "AzureServiceBus".to_string(),
1265 code: "NotImplemented".to_string(),
1266 message: "Azure Service Bus session receive not yet implemented".to_string(),
1267 })
1268 }
1269
1270 async fn complete_message(&self, _receipt: &ReceiptHandle) -> Result<(), QueueError> {
1271 Err(QueueError::ProviderError {
1273 provider: "AzureServiceBus".to_string(),
1274 code: "NotImplemented".to_string(),
1275 message: "Azure Service Bus session complete not yet implemented".to_string(),
1276 })
1277 }
1278
1279 async fn abandon_message(&self, _receipt: &ReceiptHandle) -> Result<(), QueueError> {
1280 Err(QueueError::ProviderError {
1282 provider: "AzureServiceBus".to_string(),
1283 code: "NotImplemented".to_string(),
1284 message: "Azure Service Bus session abandon not yet implemented".to_string(),
1285 })
1286 }
1287
1288 async fn dead_letter_message(
1289 &self,
1290 _receipt: &ReceiptHandle,
1291 _reason: &str,
1292 ) -> Result<(), QueueError> {
1293 Err(QueueError::ProviderError {
1295 provider: "AzureServiceBus".to_string(),
1296 code: "NotImplemented".to_string(),
1297 message: "Azure Service Bus session dead letter not yet implemented".to_string(),
1298 })
1299 }
1300
1301 async fn renew_session_lock(&self) -> Result<(), QueueError> {
1302 Err(QueueError::ProviderError {
1304 provider: "AzureServiceBus".to_string(),
1305 code: "NotImplemented".to_string(),
1306 message: "Azure Service Bus session lock renewal not yet implemented".to_string(),
1307 })
1308 }
1309
1310 async fn close_session(&self) -> Result<(), QueueError> {
1311 Ok(())
1313 }
1314
1315 fn session_id(&self) -> &SessionId {
1316 &self.session_id
1317 }
1318
1319 fn session_expires_at(&self) -> Timestamp {
1320 self.session_expires_at
1321 }
1322}
1323
1324#[allow(dead_code)] #[derive(Debug)]
1331struct AzureSender {
1332 queue_name: QueueName,
1333}
1334
1335#[allow(dead_code)] impl AzureSender {
1337 fn new(queue_name: QueueName) -> Result<Self, AzureError> {
1338 Ok(Self { queue_name })
1339 }
1340}
1341
1342#[allow(dead_code)] #[derive(Debug)]
1345struct AzureReceiver {
1346 queue_name: QueueName,
1347}
1348
1349#[allow(dead_code)] impl AzureReceiver {
1351 fn new(queue_name: QueueName) -> Result<Self, AzureError> {
1352 Ok(Self { queue_name })
1353 }
1354}
1355
1356#[allow(dead_code)] #[derive(Debug)]
1359struct AzureSessionReceiver {
1360 session_id: SessionId,
1361 queue_name: QueueName,
1362}
1363
1364#[allow(dead_code)] impl AzureSessionReceiver {
1366 fn new(session_id: SessionId, queue_name: QueueName) -> Result<Self, AzureError> {
1367 Ok(Self {
1368 session_id,
1369 queue_name,
1370 })
1371 }
1372}