1use crate::client::{QueueProvider, SessionProvider};
50use crate::error::{ConfigurationError, QueueError, SerializationError};
51use crate::message::{
52 Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
53};
54use crate::provider::{AzureServiceBusConfig, ProviderType, SessionSupport};
55use async_trait::async_trait;
56use azure_core::credentials::Secret as AzureSecret;
57use azure_core::credentials::TokenCredential;
58use azure_identity::{
59 ClientSecretCredential, ClientSecretCredentialOptions, DeveloperToolsCredential,
60 ManagedIdentityCredential,
61};
62use chrono::{Duration, Utc};
63use reqwest::{header, Client as HttpClient, StatusCode};
64use serde::{Deserialize, Serialize};
65use std::collections::{HashMap, HashSet};
66use std::fmt;
67use std::str::FromStr;
68use std::sync::Arc;
69use tokio::sync::RwLock;
70
71#[cfg(test)]
72#[path = "azure_tests.rs"]
73mod tests;
74
75async fn get_bearer_token(
85 cred: &(dyn TokenCredential + Send + Sync),
86) -> Result<String, AzureError> {
87 let scopes = &["https://servicebus.azure.net/.default"];
88 let token = cred
89 .get_token(scopes, None)
90 .await
91 .map_err(|e| AzureError::AuthenticationError(format!("Failed to get token: {}", e)))?;
92 Ok(token.token.secret().to_string())
94}
95
96fn generate_sas_token(namespace_url: &str, conn_str: &str) -> Result<String, AzureError> {
107 let mut key_name = None;
108 let mut key = None;
109
110 for part in conn_str.split(';') {
111 if let Some(value) = part.strip_prefix("SharedAccessKeyName=") {
112 key_name = Some(value.to_string());
113 } else if let Some(value) = part.strip_prefix("SharedAccessKey=") {
114 key = Some(value.to_string());
115 }
116 }
117
118 let key_name = key_name.ok_or_else(|| {
119 AzureError::AuthenticationError(
120 "Missing SharedAccessKeyName in connection string".to_string(),
121 )
122 })?;
123 let key = key.ok_or_else(|| {
124 AzureError::AuthenticationError("Missing SharedAccessKey in connection string".to_string())
125 })?;
126
127 let expiry = (Utc::now() + Duration::hours(1)).timestamp();
128 let string_to_sign = format!("{}\n{}", urlencoding::encode(namespace_url), expiry);
129
130 use base64::{engine::general_purpose::STANDARD, Engine};
131 use hmac::{Hmac, KeyInit, Mac};
132 use sha2::Sha256;
133 type HmacSha256 = Hmac<Sha256>;
134
135 let key_bytes = STANDARD
136 .decode(&key)
137 .map_err(|e| AzureError::AuthenticationError(format!("Invalid SharedAccessKey: {}", e)))?;
138 let mut mac = HmacSha256::new_from_slice(&key_bytes)
139 .map_err(|e| AzureError::AuthenticationError(format!("Failed to create HMAC: {}", e)))?;
140 mac.update(string_to_sign.as_bytes());
141 let signature = STANDARD.encode(mac.finalize().into_bytes());
142
143 Ok(format!(
144 "SharedAccessSignature sr={}&sig={}&se={}&skn={}",
145 urlencoding::encode(namespace_url),
146 urlencoding::encode(&signature),
147 expiry,
148 urlencoding::encode(&key_name)
149 ))
150}
151
152#[derive(Clone, Serialize, Deserialize)]
158pub enum AzureAuthMethod {
159 ConnectionString,
161 ManagedIdentity,
163 ClientSecret {
165 tenant_id: String,
166 client_id: String,
167 client_secret: String,
168 },
169 DefaultCredential,
171}
172
173impl fmt::Debug for AzureAuthMethod {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 match self {
176 Self::ConnectionString => f.debug_struct("ConnectionString").finish(),
177 Self::ManagedIdentity => f.debug_struct("ManagedIdentity").finish(),
178 Self::ClientSecret {
179 tenant_id,
180 client_id,
181 ..
182 } => f
183 .debug_struct("ClientSecret")
184 .field("tenant_id", tenant_id)
185 .field("client_id", client_id)
186 .field("client_secret", &"<REDACTED>")
187 .finish(),
188 Self::DefaultCredential => f.debug_struct("DefaultCredential").finish(),
189 }
190 }
191}
192
193impl fmt::Display for AzureAuthMethod {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 match self {
196 Self::ConnectionString => write!(f, "ConnectionString"),
197 Self::ManagedIdentity => write!(f, "ManagedIdentity"),
198 Self::ClientSecret { .. } => write!(f, "ClientSecret"),
199 Self::DefaultCredential => write!(f, "DefaultCredential"),
200 }
201 }
202}
203
204#[derive(Debug, thiserror::Error)]
210pub enum AzureError {
211 #[error("Authentication failed: {0}")]
212 AuthenticationError(String),
213
214 #[error("Network error: {0}")]
215 NetworkError(String),
216
217 #[error("Service Bus error: {0}")]
218 ServiceBusError(String),
219
220 #[error("Message lock lost: {0}")]
221 MessageLockLost(String),
222
223 #[error("Session lock lost: {0}")]
224 SessionLockLost(String),
225
226 #[error("Invalid configuration: {0}")]
227 ConfigurationError(String),
228
229 #[error("Serialization error: {0}")]
230 SerializationError(String),
231}
232
233impl AzureError {
234 pub fn is_transient(&self) -> bool {
236 match self {
237 Self::AuthenticationError(_) => false,
238 Self::NetworkError(_) => true,
239 Self::ServiceBusError(_) => true, Self::MessageLockLost(_) => false,
241 Self::SessionLockLost(_) => false,
242 Self::ConfigurationError(_) => false,
243 Self::SerializationError(_) => false,
244 }
245 }
246
247 pub fn to_queue_error(self) -> QueueError {
249 match self {
250 Self::AuthenticationError(msg) => QueueError::AuthenticationFailed { message: msg },
251 Self::NetworkError(msg) => QueueError::ConnectionFailed { message: msg },
252 Self::ServiceBusError(msg) => QueueError::ProviderError {
253 provider: "AzureServiceBus".to_string(),
254 code: "ServiceBusError".to_string(),
255 message: msg,
256 },
257 Self::MessageLockLost(msg) => QueueError::InvalidReceipt { receipt: msg },
258 Self::SessionLockLost(session_id) => QueueError::SessionNotFound { session_id },
259 Self::ConfigurationError(msg) => {
260 QueueError::ConfigurationError(ConfigurationError::Invalid { message: msg })
261 }
262 Self::SerializationError(msg) => QueueError::SerializationError(
263 SerializationError::JsonError(serde_json::Error::io(std::io::Error::new(
264 std::io::ErrorKind::InvalidData,
265 msg,
266 ))),
267 ),
268 }
269 }
270}
271
272pub struct AzureServiceBusProvider {
286 config: AzureServiceBusConfig,
287 http_client: HttpClient,
288 namespace_url: String,
289 credential: Option<Arc<dyn TokenCredential + Send + Sync>>,
290 lock_tokens: Arc<RwLock<HashMap<String, (String, String)>>>,
292}
293
294impl fmt::Debug for AzureServiceBusProvider {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.debug_struct("AzureServiceBusProvider")
297 .field("config", &self.config)
298 .field("namespace_url", &self.namespace_url)
299 .field(
300 "credential",
301 &self.credential.as_ref().map(|_| "<TokenCredential>"),
302 )
303 .field("lock_tokens", &self.lock_tokens)
304 .finish()
305 }
306}
307
308impl AzureServiceBusProvider {
309 pub async fn new(config: AzureServiceBusConfig) -> Result<Self, AzureError> {
343 Self::validate_config(&config)?;
345
346 let (namespace_url, credential) = match &config.auth_method {
348 AzureAuthMethod::ConnectionString => {
349 let conn_str = config.connection_string.as_ref().ok_or_else(|| {
350 AzureError::ConfigurationError(
351 "Connection string required for ConnectionString auth".to_string(),
352 )
353 })?;
354
355 let namespace_url = Self::parse_connection_string_endpoint(conn_str)?;
356 (namespace_url, None)
357 }
358 AzureAuthMethod::ManagedIdentity => {
359 let namespace = config.namespace.as_ref().ok_or_else(|| {
360 AzureError::ConfigurationError(
361 "Namespace required for ManagedIdentity auth".to_string(),
362 )
363 })?;
364
365 let credential = ManagedIdentityCredential::new(None).map_err(|e| {
366 AzureError::ConfigurationError(format!(
367 "Failed to create managed identity credential: {}",
368 e
369 ))
370 })?;
371 let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
372 (
373 namespace_url,
374 Some(credential as Arc<dyn TokenCredential + Send + Sync>),
375 )
376 }
377 AzureAuthMethod::ClientSecret {
378 ref tenant_id,
379 ref client_id,
380 ref client_secret,
381 } => {
382 let namespace = config.namespace.as_ref().ok_or_else(|| {
383 AzureError::ConfigurationError(
384 "Namespace required for ClientSecret auth".to_string(),
385 )
386 })?;
387
388 let credential = ClientSecretCredential::new(
390 tenant_id,
391 client_id.clone(),
392 AzureSecret::from(client_secret.clone()),
393 None::<ClientSecretCredentialOptions>,
394 )
395 .map_err(|e| {
396 AzureError::ConfigurationError(format!("Failed to create credential: {}", e))
397 })?;
398 let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
399 (
400 namespace_url,
401 Some(credential as Arc<dyn TokenCredential + Send + Sync>),
402 )
403 }
404 AzureAuthMethod::DefaultCredential => {
405 let namespace = config.namespace.as_ref().ok_or_else(|| {
406 AzureError::ConfigurationError(
407 "Namespace required for DefaultCredential auth".to_string(),
408 )
409 })?;
410
411 let credential = DeveloperToolsCredential::new(None).map_err(|e| {
414 AzureError::ConfigurationError(format!(
415 "Failed to create developer tools credential: {}",
416 e
417 ))
418 })?;
419 let namespace_url = format!("https://{}.servicebus.windows.net", namespace);
420 (
421 namespace_url,
422 Some(credential as Arc<dyn TokenCredential + Send + Sync>),
423 )
424 }
425 };
426
427 let http_client = HttpClient::builder()
429 .timeout(std::time::Duration::from_secs(30))
430 .build()
431 .map_err(|e| {
432 AzureError::NetworkError(format!("Failed to create HTTP client: {}", e))
433 })?;
434
435 Ok(Self {
436 config,
437 http_client,
438 namespace_url,
439 credential,
440 lock_tokens: Arc::new(RwLock::new(HashMap::new())),
441 })
442 }
443
444 fn parse_connection_string_endpoint(conn_str: &str) -> Result<String, AzureError> {
446 for part in conn_str.split(';') {
447 if let Some(endpoint) = part.strip_prefix("Endpoint=") {
448 return Ok(endpoint.trim_end_matches('/').to_string());
449 }
450 }
451 Err(AzureError::ConfigurationError(
452 "Invalid connection string: missing Endpoint".to_string(),
453 ))
454 }
455
456 fn validate_config(config: &AzureServiceBusConfig) -> Result<(), AzureError> {
458 match &config.auth_method {
459 AzureAuthMethod::ConnectionString => {
460 if config.connection_string.is_none() {
461 return Err(AzureError::ConfigurationError(
462 "Connection string required for ConnectionString auth method".to_string(),
463 ));
464 }
465 }
466 AzureAuthMethod::ManagedIdentity | AzureAuthMethod::DefaultCredential => {
467 if config.namespace.is_none() {
468 return Err(AzureError::ConfigurationError(
469 "Namespace required for ManagedIdentity/DefaultCredential auth".to_string(),
470 ));
471 }
472 }
473 AzureAuthMethod::ClientSecret {
474 tenant_id,
475 client_id,
476 client_secret,
477 } => {
478 if config.namespace.is_none() {
479 return Err(AzureError::ConfigurationError(
480 "Namespace required for ClientSecret auth".to_string(),
481 ));
482 }
483 if tenant_id.is_empty() || client_id.is_empty() || client_secret.is_empty() {
484 return Err(AzureError::ConfigurationError(
485 "Tenant ID, Client ID, and Client Secret required for ClientSecret auth"
486 .to_string(),
487 ));
488 }
489 }
490 }
491
492 Ok(())
493 }
494
495 async fn get_auth_token(&self) -> Result<String, AzureError> {
497 match &self.credential {
498 Some(cred) => get_bearer_token(cred.as_ref()).await,
499 None => {
500 self.get_sas_token()
502 }
503 }
504 }
505
506 fn get_sas_token(&self) -> Result<String, AzureError> {
510 let conn_str = self.config.connection_string.as_ref().ok_or_else(|| {
511 AzureError::AuthenticationError("No connection string available".to_string())
512 })?;
513 generate_sas_token(&self.namespace_url, conn_str)
514 }
515}
516
517#[derive(Debug, Serialize, Deserialize)]
523struct ServiceBusMessageBody {
524 #[serde(rename = "ContentType")]
525 content_type: String,
526 #[serde(rename = "Body")]
527 body: String, #[serde(rename = "BrokerProperties")]
529 broker_properties: BrokerProperties,
530}
531
532#[derive(Debug, Serialize, Deserialize)]
533struct BrokerProperties {
534 #[serde(rename = "MessageId")]
535 message_id: String,
536 #[serde(rename = "SessionId", skip_serializing_if = "Option::is_none")]
537 session_id: Option<String>,
538 #[serde(rename = "TimeToLive", skip_serializing_if = "Option::is_none")]
539 time_to_live: Option<u64>,
540}
541
542#[derive(Debug, Deserialize)]
544struct ServiceBusMessageResponse {
545 #[serde(rename = "Body")]
546 body: String,
547 #[serde(rename = "BrokerProperties")]
548 broker_properties: ReceivedBrokerProperties,
549}
550
551#[allow(dead_code)] #[derive(Debug, Deserialize)]
553struct ReceivedServiceBusMessage {
554 #[serde(rename = "Body")]
555 body: String,
556 #[serde(rename = "BrokerProperties")]
557 broker_properties: ReceivedBrokerProperties,
558}
559
560#[allow(dead_code)] #[derive(Debug, Deserialize)]
562struct ReceivedBrokerProperties {
563 #[serde(rename = "MessageId")]
564 message_id: String,
565 #[serde(rename = "SessionId")]
566 session_id: Option<String>,
567 #[serde(rename = "LockToken")]
568 lock_token: String,
569 #[serde(rename = "DeliveryCount")]
570 delivery_count: u32,
571 #[serde(rename = "EnqueuedTimeUtc")]
572 enqueued_time_utc: String,
573}
574
575#[async_trait]
580impl QueueProvider for AzureServiceBusProvider {
581 async fn send_message(
582 &self,
583 queue: &QueueName,
584 message: &Message,
585 ) -> Result<MessageId, QueueError> {
586 let message_id = MessageId::new();
588
589 use base64::{engine::general_purpose::STANDARD, Engine};
591 let body_base64 = STANDARD.encode(&message.body);
592
593 let broker_props = BrokerProperties {
595 message_id: message_id.to_string(),
596 session_id: message.session_id.as_ref().map(|s| s.to_string()),
597 time_to_live: message
598 .time_to_live
599 .as_ref()
600 .map(|ttl| ttl.num_seconds() as u64),
601 };
602
603 let url = format!("{}/{}/messages", self.namespace_url, queue.as_str());
605
606 let auth_token = self
608 .get_auth_token()
609 .await
610 .map_err(|e| e.to_queue_error())?;
611
612 let response = self
614 .http_client
615 .post(&url)
616 .header(header::AUTHORIZATION, auth_token)
617 .header(
618 header::CONTENT_TYPE,
619 "application/atom+xml;type=entry;charset=utf-8",
620 )
621 .header(
622 "BrokerProperties",
623 serde_json::to_string(&broker_props).unwrap(),
624 )
625 .body(body_base64)
626 .send()
627 .await
628 .map_err(|e| {
629 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
630 })?;
631
632 match response.status() {
634 StatusCode::CREATED | StatusCode::OK => Ok(message_id),
635 status => {
636 let error_body = response.text().await.unwrap_or_default();
637 Err(QueueError::ProviderError {
638 provider: "AzureServiceBus".to_string(),
639 code: status.as_str().to_string(),
640 message: format!("Send failed: {}", error_body),
641 })
642 }
643 }
644 }
645
646 async fn send_messages(
647 &self,
648 queue: &QueueName,
649 messages: &[Message],
650 ) -> Result<Vec<MessageId>, QueueError> {
651 if messages.len() > 100 {
653 return Err(QueueError::BatchTooLarge {
654 size: messages.len(),
655 max_size: 100,
656 });
657 }
658
659 if messages.is_empty() {
660 return Ok(Vec::new());
661 }
662
663 let mut batch_messages = Vec::with_capacity(messages.len());
665 let mut message_ids = Vec::with_capacity(messages.len());
666
667 use base64::{engine::general_purpose::STANDARD, Engine};
668
669 for message in messages {
670 let message_id = MessageId::new();
671 let body_base64 = STANDARD.encode(&message.body);
672
673 let broker_props = BrokerProperties {
674 message_id: message_id.to_string(),
675 session_id: message.session_id.as_ref().map(|s| s.to_string()),
676 time_to_live: message
677 .time_to_live
678 .as_ref()
679 .map(|ttl| ttl.num_seconds() as u64),
680 };
681
682 batch_messages.push(ServiceBusMessageBody {
683 content_type: "application/octet-stream".to_string(),
684 body: body_base64,
685 broker_properties: broker_props,
686 });
687
688 message_ids.push(message_id);
689 }
690
691 let url = format!("{}/{}/messages", self.namespace_url, queue.as_str());
693
694 let auth_token = self
696 .get_auth_token()
697 .await
698 .map_err(|e| e.to_queue_error())?;
699
700 let response = self
702 .http_client
703 .post(&url)
704 .header(header::AUTHORIZATION, auth_token)
705 .header(header::CONTENT_TYPE, "application/json")
706 .json(&batch_messages)
707 .send()
708 .await
709 .map_err(|e| {
710 AzureError::NetworkError(format!("Batch send HTTP request failed: {}", e))
711 .to_queue_error()
712 })?;
713
714 match response.status() {
716 StatusCode::CREATED | StatusCode::OK => Ok(message_ids),
717 StatusCode::PAYLOAD_TOO_LARGE => Err(QueueError::BatchTooLarge {
718 size: messages.len(),
719 max_size: 100,
720 }),
721 StatusCode::TOO_MANY_REQUESTS => {
722 let retry_after = response
723 .headers()
724 .get("Retry-After")
725 .and_then(|v| v.to_str().ok())
726 .and_then(|s| s.parse::<u64>().ok())
727 .unwrap_or(30);
728
729 Err(QueueError::ProviderError {
730 provider: "AzureServiceBus".to_string(),
731 code: "ThrottlingError".to_string(),
732 message: format!("Request throttled, retry after {} seconds", retry_after),
733 })
734 }
735 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
736 let error_body = response.text().await.unwrap_or_default();
737 Err(QueueError::AuthenticationFailed {
738 message: format!("Authentication failed: {}", error_body),
739 })
740 }
741 status => {
742 let error_body = response.text().await.unwrap_or_default();
743 Err(QueueError::ProviderError {
744 provider: "AzureServiceBus".to_string(),
745 code: status.as_str().to_string(),
746 message: format!("Batch send failed: {}", error_body),
747 })
748 }
749 }
750 }
751
752 async fn receive_message(
753 &self,
754 queue: &QueueName,
755 timeout: Duration,
756 ) -> Result<Option<ReceivedMessage>, QueueError> {
757 let url = format!(
760 "{}/{}/messages/head?timeout={}",
761 self.namespace_url,
762 queue.as_str(),
763 timeout.num_seconds()
764 );
765
766 let auth_token = self
768 .get_auth_token()
769 .await
770 .map_err(|e| e.to_queue_error())?;
771
772 let response = self
774 .http_client
775 .delete(&url)
776 .header(header::AUTHORIZATION, auth_token)
777 .send()
778 .await
779 .map_err(|e| {
780 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
781 })?;
782
783 match response.status() {
785 StatusCode::OK | StatusCode::CREATED => {
786 let broker_props = response
788 .headers()
789 .get("BrokerProperties")
790 .and_then(|v| v.to_str().ok())
791 .and_then(|s| serde_json::from_str::<ReceivedBrokerProperties>(s).ok())
792 .ok_or_else(|| QueueError::ProviderError {
793 provider: "AzureServiceBus".to_string(),
794 code: "InvalidResponse".to_string(),
795 message: "Missing or invalid BrokerProperties header".to_string(),
796 })?;
797
798 let body_base64 = response.text().await.map_err(|e| {
800 AzureError::NetworkError(format!("Failed to read response body: {}", e))
801 .to_queue_error()
802 })?;
803
804 use base64::{engine::general_purpose::STANDARD, Engine};
806 let body =
807 STANDARD
808 .decode(&body_base64)
809 .map_err(|e| QueueError::ProviderError {
810 provider: "AzureServiceBus".to_string(),
811 code: "DecodingError".to_string(),
812 message: format!("Failed to decode message body: {}", e),
813 })?;
814
815 let first_delivered_at =
817 chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
818 .map(|dt| Timestamp::from_datetime(dt.with_timezone(&chrono::Utc)))
819 .unwrap_or_else(|_| Timestamp::now());
820
821 let expires_at = Timestamp::from_datetime(Utc::now() + Duration::seconds(30));
824 let receipt_str = format!("{}::{}", broker_props.lock_token, queue.as_str());
825 let receipt = ReceiptHandle::new(
826 receipt_str.clone(),
827 expires_at,
828 ProviderType::AzureServiceBus,
829 );
830
831 self.lock_tokens.write().await.insert(
833 receipt_str,
834 (broker_props.lock_token.clone(), queue.as_str().to_string()),
835 );
836
837 let message_id = MessageId::from_str(&broker_props.message_id)
839 .unwrap_or_else(|_| MessageId::new());
840
841 let received_message = ReceivedMessage {
843 message_id,
844 body: bytes::Bytes::from(body),
845 attributes: HashMap::new(),
846 session_id: broker_props.session_id.map(SessionId::new).transpose()?,
847 correlation_id: None,
848 receipt_handle: receipt,
849 delivery_count: broker_props.delivery_count,
850 first_delivered_at,
851 delivered_at: Timestamp::now(),
852 };
853
854 Ok(Some(received_message))
855 }
856 StatusCode::NO_CONTENT => {
857 Ok(None)
859 }
860 status => {
861 let error_body = response.text().await.unwrap_or_default();
862 Err(QueueError::ProviderError {
863 provider: "AzureServiceBus".to_string(),
864 code: status.as_str().to_string(),
865 message: format!("Receive failed: {}", error_body),
866 })
867 }
868 }
869 }
870
871 async fn receive_messages(
872 &self,
873 queue: &QueueName,
874 max_messages: u32,
875 timeout: Duration,
876 ) -> Result<Vec<ReceivedMessage>, QueueError> {
877 if max_messages > 32 {
879 return Err(QueueError::BatchTooLarge {
880 size: max_messages as usize,
881 max_size: 32,
882 });
883 }
884
885 if max_messages == 0 {
886 return Ok(Vec::new());
887 }
888
889 let url = format!(
892 "{}/{}/messages/head?timeout={}&maxMessageCount={}",
893 self.namespace_url,
894 queue.as_str(),
895 timeout.num_seconds(),
896 max_messages
897 );
898
899 let auth_token = self
901 .get_auth_token()
902 .await
903 .map_err(|e| e.to_queue_error())?;
904
905 let response = self
907 .http_client
908 .delete(&url)
909 .header(header::AUTHORIZATION, auth_token)
910 .send()
911 .await
912 .map_err(|e| {
913 AzureError::NetworkError(format!("Batch receive HTTP request failed: {}", e))
914 .to_queue_error()
915 })?;
916
917 match response.status() {
919 StatusCode::OK | StatusCode::CREATED => {
920 let messages_data: Vec<ServiceBusMessageResponse> =
922 response.json().await.map_err(|e| {
923 AzureError::SerializationError(format!(
924 "Failed to parse batch receive response: {}",
925 e
926 ))
927 .to_queue_error()
928 })?;
929
930 let mut received_messages = Vec::with_capacity(messages_data.len());
931
932 use base64::{engine::general_purpose::STANDARD, Engine};
933
934 for msg_data in messages_data {
935 let broker_props = msg_data.broker_properties;
936
937 let body = STANDARD.decode(&msg_data.body).map_err(|e| {
939 AzureError::SerializationError(format!(
940 "Failed to decode message body: {}",
941 e
942 ))
943 .to_queue_error()
944 })?;
945
946 let enqueued_time =
948 chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
949 .map_err(|e| {
950 AzureError::SerializationError(format!(
951 "Failed to parse enqueued time: {}",
952 e
953 ))
954 .to_queue_error()
955 })?;
956 let first_delivered_at =
957 Timestamp::from_datetime(enqueued_time.with_timezone(&Utc));
958
959 let expires_at = Timestamp::from_datetime(Utc::now() + Duration::seconds(30));
961 let receipt_str = format!("{}::{}", broker_props.lock_token, queue.as_str());
962 let receipt = ReceiptHandle::new(
963 receipt_str.clone(),
964 expires_at,
965 ProviderType::AzureServiceBus,
966 );
967
968 self.lock_tokens.write().await.insert(
970 receipt_str,
971 (broker_props.lock_token.clone(), queue.as_str().to_string()),
972 );
973
974 let message_id = MessageId::from_str(&broker_props.message_id)
976 .unwrap_or_else(|_| MessageId::new());
977
978 let received_message = ReceivedMessage {
980 message_id,
981 body: bytes::Bytes::from(body),
982 attributes: HashMap::new(),
983 session_id: broker_props.session_id.map(SessionId::new).transpose()?,
984 correlation_id: None,
985 receipt_handle: receipt,
986 delivery_count: broker_props.delivery_count,
987 first_delivered_at,
988 delivered_at: Timestamp::now(),
989 };
990
991 received_messages.push(received_message);
992 }
993
994 Ok(received_messages)
995 }
996 StatusCode::NO_CONTENT => {
997 Ok(Vec::new())
999 }
1000 StatusCode::TOO_MANY_REQUESTS => {
1001 let retry_after = response
1002 .headers()
1003 .get("Retry-After")
1004 .and_then(|v| v.to_str().ok())
1005 .and_then(|s| s.parse::<u64>().ok())
1006 .unwrap_or(30);
1007
1008 Err(QueueError::ProviderError {
1009 provider: "AzureServiceBus".to_string(),
1010 code: "ThrottlingError".to_string(),
1011 message: format!("Request throttled, retry after {} seconds", retry_after),
1012 })
1013 }
1014 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
1015 let error_body = response.text().await.unwrap_or_default();
1016 Err(QueueError::AuthenticationFailed {
1017 message: format!("Authentication failed: {}", error_body),
1018 })
1019 }
1020 status => {
1021 let error_body = response.text().await.unwrap_or_default();
1022 Err(QueueError::ProviderError {
1023 provider: "AzureServiceBus".to_string(),
1024 code: status.as_str().to_string(),
1025 message: format!("Batch receive failed: {}", error_body),
1026 })
1027 }
1028 }
1029 }
1030
1031 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1032 let lock_tokens = self.lock_tokens.read().await;
1034 let (lock_token, queue_name) =
1035 lock_tokens
1036 .get(receipt.handle())
1037 .ok_or_else(|| QueueError::InvalidReceipt {
1038 receipt: receipt.handle().to_string(),
1039 })?;
1040
1041 let url = format!(
1043 "{}/{}/messages/head/{}",
1044 self.namespace_url,
1045 queue_name,
1046 urlencoding::encode(lock_token)
1047 );
1048
1049 let auth_token = self
1051 .get_auth_token()
1052 .await
1053 .map_err(|e| e.to_queue_error())?;
1054
1055 let response = self
1057 .http_client
1058 .delete(&url)
1059 .header(header::AUTHORIZATION, auth_token)
1060 .send()
1061 .await
1062 .map_err(|e| {
1063 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1064 })?;
1065
1066 match response.status() {
1068 StatusCode::OK | StatusCode::NO_CONTENT => {
1069 drop(lock_tokens);
1071 self.lock_tokens.write().await.remove(receipt.handle());
1072 Ok(())
1073 }
1074 StatusCode::GONE | StatusCode::NOT_FOUND => {
1075 Err(QueueError::InvalidReceipt {
1077 receipt: receipt.handle().to_string(),
1078 })
1079 }
1080 status => {
1081 let error_body = response.text().await.unwrap_or_default();
1082 Err(QueueError::ProviderError {
1083 provider: "AzureServiceBus".to_string(),
1084 code: status.as_str().to_string(),
1085 message: format!("Complete failed: {}", error_body),
1086 })
1087 }
1088 }
1089 }
1090
1091 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1092 let lock_tokens = self.lock_tokens.read().await;
1094 let (lock_token, queue_name) =
1095 lock_tokens
1096 .get(receipt.handle())
1097 .ok_or_else(|| QueueError::InvalidReceipt {
1098 receipt: receipt.handle().to_string(),
1099 })?;
1100
1101 let url = format!(
1104 "{}/{}/messages/head/{}",
1105 self.namespace_url,
1106 queue_name,
1107 urlencoding::encode(lock_token)
1108 );
1109
1110 let auth_token = self
1112 .get_auth_token()
1113 .await
1114 .map_err(|e| e.to_queue_error())?;
1115
1116 let response = self
1118 .http_client
1119 .put(&url)
1120 .header(header::AUTHORIZATION, auth_token)
1121 .header(header::CONTENT_LENGTH, "0")
1122 .send()
1123 .await
1124 .map_err(|e| {
1125 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1126 })?;
1127
1128 match response.status() {
1130 StatusCode::OK | StatusCode::NO_CONTENT => {
1131 drop(lock_tokens);
1133 self.lock_tokens.write().await.remove(receipt.handle());
1134 Ok(())
1135 }
1136 StatusCode::GONE | StatusCode::NOT_FOUND => {
1137 Err(QueueError::InvalidReceipt {
1139 receipt: receipt.handle().to_string(),
1140 })
1141 }
1142 status => {
1143 let error_body = response.text().await.unwrap_or_default();
1144 Err(QueueError::ProviderError {
1145 provider: "AzureServiceBus".to_string(),
1146 code: status.as_str().to_string(),
1147 message: format!("Abandon failed: {}", error_body),
1148 })
1149 }
1150 }
1151 }
1152
1153 async fn dead_letter_message(
1154 &self,
1155 receipt: &ReceiptHandle,
1156 reason: &str,
1157 ) -> Result<(), QueueError> {
1158 let lock_tokens = self.lock_tokens.read().await;
1160 let (lock_token, queue_name) =
1161 lock_tokens
1162 .get(receipt.handle())
1163 .ok_or_else(|| QueueError::InvalidReceipt {
1164 receipt: receipt.handle().to_string(),
1165 })?;
1166
1167 let url = format!(
1170 "{}/{}/messages/head/{}/$deadletter",
1171 self.namespace_url,
1172 queue_name,
1173 urlencoding::encode(lock_token)
1174 );
1175
1176 let auth_token = self
1178 .get_auth_token()
1179 .await
1180 .map_err(|e| e.to_queue_error())?;
1181
1182 let properties = serde_json::json!({
1184 "DeadLetterReason": reason,
1185 "DeadLetterErrorDescription": "Message processing failed"
1186 });
1187
1188 let response = self
1190 .http_client
1191 .post(&url)
1192 .header(header::AUTHORIZATION, auth_token)
1193 .header(header::CONTENT_TYPE, "application/json")
1194 .json(&properties)
1195 .send()
1196 .await
1197 .map_err(|e| {
1198 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1199 })?;
1200
1201 match response.status() {
1203 StatusCode::OK | StatusCode::NO_CONTENT | StatusCode::CREATED => {
1204 drop(lock_tokens);
1206 self.lock_tokens.write().await.remove(receipt.handle());
1207 Ok(())
1208 }
1209 StatusCode::GONE | StatusCode::NOT_FOUND => {
1210 Err(QueueError::InvalidReceipt {
1212 receipt: receipt.handle().to_string(),
1213 })
1214 }
1215 status => {
1216 let error_body = response.text().await.unwrap_or_default();
1217 Err(QueueError::ProviderError {
1218 provider: "AzureServiceBus".to_string(),
1219 code: status.as_str().to_string(),
1220 message: format!("Dead letter failed: {}", error_body),
1221 })
1222 }
1223 }
1224 }
1225
1226 async fn create_session_client(
1227 &self,
1228 queue: &QueueName,
1229 session_id: Option<SessionId>,
1230 ) -> Result<Box<dyn SessionProvider>, QueueError> {
1231 let resolved_id = match session_id {
1232 Some(id) => id,
1233 None => self.accept_next_available_session(queue).await?,
1234 };
1235
1236 Ok(Box::new(AzureSessionProvider::new(
1237 resolved_id,
1238 queue.clone(),
1239 self.config.session_timeout,
1240 self.http_client.clone(),
1241 self.namespace_url.clone(),
1242 self.config.clone(),
1243 self.credential.clone(),
1244 )))
1245 }
1246
1247 fn provider_type(&self) -> ProviderType {
1248 ProviderType::AzureServiceBus
1249 }
1250
1251 fn supports_sessions(&self) -> SessionSupport {
1252 SessionSupport::Native
1253 }
1254
1255 fn supports_batching(&self) -> bool {
1256 true
1257 }
1258
1259 fn max_batch_size(&self) -> u32 {
1260 100 }
1262}
1263
1264impl AzureServiceBusProvider {
1265 async fn accept_next_available_session(
1287 &self,
1288 queue: &QueueName,
1289 ) -> Result<SessionId, QueueError> {
1290 let timeout_secs = self.config.session_timeout.num_seconds().max(1);
1293 let url = format!(
1294 "{}/{}/sessions/$acceptnext/messages/head?timeout={}",
1295 self.namespace_url,
1296 queue.as_str(),
1297 timeout_secs
1298 );
1299
1300 let auth_token = self
1301 .get_auth_token()
1302 .await
1303 .map_err(|e| e.to_queue_error())?;
1304
1305 let response = self
1306 .http_client
1307 .delete(&url)
1308 .header(header::AUTHORIZATION, auth_token)
1309 .send()
1310 .await
1311 .map_err(|e| {
1312 AzureError::NetworkError(format!("Failed to accept next session: {}", e))
1313 .to_queue_error()
1314 })?;
1315
1316 match response.status() {
1317 StatusCode::OK | StatusCode::CREATED => {
1318 let broker_props = response
1320 .headers()
1321 .get("BrokerProperties")
1322 .and_then(|v| v.to_str().ok())
1323 .and_then(|s| serde_json::from_str::<ReceivedBrokerProperties>(s).ok())
1324 .ok_or_else(|| QueueError::ProviderError {
1325 provider: "AzureServiceBus".to_string(),
1326 code: "InvalidResponse".to_string(),
1327 message: "Missing BrokerProperties in accept-next-session response"
1328 .to_string(),
1329 })?;
1330
1331 let session_id_str =
1332 broker_props
1333 .session_id
1334 .ok_or_else(|| QueueError::ProviderError {
1335 provider: "AzureServiceBus".to_string(),
1336 code: "NoSessionId".to_string(),
1337 message: "Accepted message has no SessionId".to_string(),
1338 })?;
1339
1340 SessionId::new(session_id_str).map_err(|e| QueueError::ProviderError {
1341 provider: "AzureServiceBus".to_string(),
1342 code: "InvalidSessionId".to_string(),
1343 message: format!("Invalid session ID returned by broker: {}", e),
1344 })
1345 }
1346 StatusCode::NO_CONTENT => Err(QueueError::ProviderError {
1347 provider: "AzureServiceBus".to_string(),
1348 code: "NoSessionsAvailable".to_string(),
1349 message: "No sessions with pending messages are available".to_string(),
1350 }),
1351 StatusCode::NOT_FOUND => Err(QueueError::QueueNotFound {
1352 queue_name: queue.to_string(),
1353 }),
1354 status => {
1355 let error_body = response.text().await.unwrap_or_default();
1356 Err(QueueError::ProviderError {
1357 provider: "AzureServiceBus".to_string(),
1358 code: status.as_str().to_string(),
1359 message: format!("Accept next session failed: {}", error_body),
1360 })
1361 }
1362 }
1363 }
1364}
1365
1366pub struct AzureSessionProvider {
1386 session_id: SessionId,
1387 queue_name: QueueName,
1388 session_expires_at: Arc<std::sync::RwLock<Timestamp>>,
1391 http_client: HttpClient,
1392 namespace_url: String,
1393 config: AzureServiceBusConfig,
1394 credential: Option<Arc<dyn TokenCredential + Send + Sync>>,
1395 lock_tokens: Arc<RwLock<HashSet<String>>>,
1398}
1399
1400impl fmt::Debug for AzureSessionProvider {
1401 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1402 f.debug_struct("AzureSessionProvider")
1403 .field("session_id", &self.session_id)
1404 .field("queue_name", &self.queue_name)
1405 .field("namespace_url", &self.namespace_url)
1406 .field(
1407 "credential",
1408 &self.credential.as_ref().map(|_| "<TokenCredential>"),
1409 )
1410 .finish()
1411 }
1412}
1413
1414impl AzureSessionProvider {
1415 pub fn new(
1431 session_id: SessionId,
1432 queue_name: QueueName,
1433 session_timeout: Duration,
1434 http_client: HttpClient,
1435 namespace_url: String,
1436 config: AzureServiceBusConfig,
1437 credential: Option<Arc<dyn TokenCredential + Send + Sync>>,
1438 ) -> Self {
1439 let session_expires_at = Timestamp::from_datetime(Utc::now() + session_timeout);
1440 Self {
1441 session_id,
1442 queue_name,
1443 session_expires_at: Arc::new(std::sync::RwLock::new(session_expires_at)),
1444 http_client,
1445 namespace_url,
1446 config,
1447 credential,
1448 lock_tokens: Arc::new(RwLock::new(HashSet::new())),
1449 }
1450 }
1451
1452 async fn get_auth_token(&self) -> Result<String, AzureError> {
1456 match &self.credential {
1457 Some(cred) => get_bearer_token(cred.as_ref()).await,
1458 None => {
1459 let conn_str = self.config.connection_string.as_ref().ok_or_else(|| {
1460 AzureError::AuthenticationError("No connection string available".to_string())
1461 })?;
1462 generate_sas_token(&self.namespace_url, conn_str)
1463 }
1464 }
1465 }
1466
1467 fn refresh_session_expiry(&self) {
1469 if let Ok(mut expiry) = self.session_expires_at.write() {
1470 *expiry = Timestamp::from_datetime(Utc::now() + self.config.session_timeout);
1471 }
1472 }
1473}
1474
1475#[async_trait]
1476impl SessionProvider for AzureSessionProvider {
1477 async fn receive_message(
1491 &self,
1492 timeout: Duration,
1493 ) -> Result<Option<ReceivedMessage>, QueueError> {
1494 let url = format!(
1495 "{}/{}/sessions/{}/messages/head?timeout={}",
1496 self.namespace_url,
1497 self.queue_name.as_str(),
1498 urlencoding::encode(self.session_id.as_str()),
1499 timeout.num_seconds()
1500 );
1501
1502 let auth_token = self
1503 .get_auth_token()
1504 .await
1505 .map_err(|e| e.to_queue_error())?;
1506
1507 let response = self
1508 .http_client
1509 .delete(&url)
1510 .header(header::AUTHORIZATION, auth_token)
1511 .send()
1512 .await
1513 .map_err(|e| {
1514 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1515 })?;
1516
1517 match response.status() {
1518 StatusCode::OK | StatusCode::CREATED => {
1519 let broker_props = response
1520 .headers()
1521 .get("BrokerProperties")
1522 .and_then(|v| v.to_str().ok())
1523 .and_then(|s| serde_json::from_str::<ReceivedBrokerProperties>(s).ok())
1524 .ok_or_else(|| QueueError::ProviderError {
1525 provider: "AzureServiceBus".to_string(),
1526 code: "InvalidResponse".to_string(),
1527 message: "Missing or invalid BrokerProperties header".to_string(),
1528 })?;
1529
1530 let body_base64 = response.text().await.map_err(|e| {
1531 AzureError::NetworkError(format!("Failed to read response body: {}", e))
1532 .to_queue_error()
1533 })?;
1534
1535 use base64::{engine::general_purpose::STANDARD, Engine};
1536 let body =
1537 STANDARD
1538 .decode(&body_base64)
1539 .map_err(|e| QueueError::ProviderError {
1540 provider: "AzureServiceBus".to_string(),
1541 code: "DecodingError".to_string(),
1542 message: format!("Failed to decode message body: {}", e),
1543 })?;
1544
1545 let first_delivered_at =
1546 chrono::DateTime::parse_from_rfc3339(&broker_props.enqueued_time_utc)
1547 .map(|dt| Timestamp::from_datetime(dt.with_timezone(&chrono::Utc)))
1548 .unwrap_or_else(|_| Timestamp::now());
1549
1550 let expires_at = Timestamp::from_datetime(Utc::now() + self.config.session_timeout);
1554 let lock_token = broker_props.lock_token.clone();
1555 let receipt = ReceiptHandle::new(
1556 lock_token.clone(),
1557 expires_at,
1558 ProviderType::AzureServiceBus,
1559 );
1560
1561 self.lock_tokens.write().await.insert(lock_token);
1562
1563 let message_id = MessageId::from_str(&broker_props.message_id)
1564 .unwrap_or_else(|_| MessageId::new());
1565
1566 self.refresh_session_expiry();
1568
1569 Ok(Some(ReceivedMessage {
1570 message_id,
1571 body: bytes::Bytes::from(body),
1572 attributes: HashMap::new(),
1573 session_id: Some(self.session_id.clone()),
1574 correlation_id: None,
1575 receipt_handle: receipt,
1576 delivery_count: broker_props.delivery_count,
1577 first_delivered_at,
1578 delivered_at: Timestamp::now(),
1579 }))
1580 }
1581 StatusCode::NO_CONTENT => Ok(None),
1582 StatusCode::GONE | StatusCode::NOT_FOUND => Err(QueueError::SessionNotFound {
1583 session_id: self.session_id.to_string(),
1584 }),
1585 status => {
1586 let error_body = response.text().await.unwrap_or_default();
1587 Err(QueueError::ProviderError {
1588 provider: "AzureServiceBus".to_string(),
1589 code: status.as_str().to_string(),
1590 message: format!("Session receive failed: {}", error_body),
1591 })
1592 }
1593 }
1594 }
1595
1596 async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1604 if !self.lock_tokens.read().await.contains(receipt.handle()) {
1605 return Err(QueueError::InvalidReceipt {
1606 receipt: receipt.handle().to_string(),
1607 });
1608 }
1609 let lock_token = receipt.handle().to_string();
1610
1611 let url = format!(
1612 "{}/{}/sessions/{}/messages/{}",
1613 self.namespace_url,
1614 self.queue_name.as_str(),
1615 urlencoding::encode(self.session_id.as_str()),
1616 urlencoding::encode(&lock_token)
1617 );
1618
1619 let auth_token = self
1620 .get_auth_token()
1621 .await
1622 .map_err(|e| e.to_queue_error())?;
1623
1624 let response = self
1625 .http_client
1626 .delete(&url)
1627 .header(header::AUTHORIZATION, auth_token)
1628 .send()
1629 .await
1630 .map_err(|e| {
1631 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1632 })?;
1633
1634 match response.status() {
1635 StatusCode::OK | StatusCode::NO_CONTENT => {
1636 self.lock_tokens.write().await.remove(receipt.handle());
1637 Ok(())
1638 }
1639 StatusCode::GONE | StatusCode::NOT_FOUND => Err(QueueError::InvalidReceipt {
1640 receipt: receipt.handle().to_string(),
1641 }),
1642 status => {
1643 let error_body = response.text().await.unwrap_or_default();
1644 Err(QueueError::ProviderError {
1645 provider: "AzureServiceBus".to_string(),
1646 code: status.as_str().to_string(),
1647 message: format!("Session complete failed: {}", error_body),
1648 })
1649 }
1650 }
1651 }
1652
1653 async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
1661 if !self.lock_tokens.read().await.contains(receipt.handle()) {
1662 return Err(QueueError::InvalidReceipt {
1663 receipt: receipt.handle().to_string(),
1664 });
1665 }
1666 let lock_token = receipt.handle().to_string();
1667
1668 let url = format!(
1669 "{}/{}/sessions/{}/messages/{}",
1670 self.namespace_url,
1671 self.queue_name.as_str(),
1672 urlencoding::encode(self.session_id.as_str()),
1673 urlencoding::encode(&lock_token)
1674 );
1675
1676 let auth_token = self
1677 .get_auth_token()
1678 .await
1679 .map_err(|e| e.to_queue_error())?;
1680
1681 let response = self
1682 .http_client
1683 .put(&url)
1684 .header(header::AUTHORIZATION, auth_token)
1685 .header(header::CONTENT_LENGTH, "0")
1686 .send()
1687 .await
1688 .map_err(|e| {
1689 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1690 })?;
1691
1692 match response.status() {
1693 StatusCode::OK | StatusCode::NO_CONTENT => {
1694 self.lock_tokens.write().await.remove(receipt.handle());
1695 Ok(())
1696 }
1697 StatusCode::GONE | StatusCode::NOT_FOUND => Err(QueueError::InvalidReceipt {
1698 receipt: receipt.handle().to_string(),
1699 }),
1700 status => {
1701 let error_body = response.text().await.unwrap_or_default();
1702 Err(QueueError::ProviderError {
1703 provider: "AzureServiceBus".to_string(),
1704 code: status.as_str().to_string(),
1705 message: format!("Session abandon failed: {}", error_body),
1706 })
1707 }
1708 }
1709 }
1710
1711 async fn dead_letter_message(
1720 &self,
1721 receipt: &ReceiptHandle,
1722 reason: &str,
1723 ) -> Result<(), QueueError> {
1724 if !self.lock_tokens.read().await.contains(receipt.handle()) {
1725 return Err(QueueError::InvalidReceipt {
1726 receipt: receipt.handle().to_string(),
1727 });
1728 }
1729 let lock_token = receipt.handle().to_string();
1730
1731 let url = format!(
1732 "{}/{}/sessions/{}/messages/{}/$deadletter",
1733 self.namespace_url,
1734 self.queue_name.as_str(),
1735 urlencoding::encode(self.session_id.as_str()),
1736 urlencoding::encode(&lock_token)
1737 );
1738
1739 let auth_token = self
1740 .get_auth_token()
1741 .await
1742 .map_err(|e| e.to_queue_error())?;
1743
1744 let properties = serde_json::json!({
1745 "DeadLetterReason": reason,
1746 "DeadLetterErrorDescription": "Message processing failed"
1747 });
1748
1749 let response = self
1750 .http_client
1751 .post(&url)
1752 .header(header::AUTHORIZATION, auth_token)
1753 .header(header::CONTENT_TYPE, "application/json")
1754 .json(&properties)
1755 .send()
1756 .await
1757 .map_err(|e| {
1758 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1759 })?;
1760
1761 match response.status() {
1762 StatusCode::OK | StatusCode::NO_CONTENT | StatusCode::CREATED => {
1763 self.lock_tokens.write().await.remove(receipt.handle());
1764 Ok(())
1765 }
1766 StatusCode::GONE | StatusCode::NOT_FOUND => Err(QueueError::InvalidReceipt {
1767 receipt: receipt.handle().to_string(),
1768 }),
1769 status => {
1770 let error_body = response.text().await.unwrap_or_default();
1771 Err(QueueError::ProviderError {
1772 provider: "AzureServiceBus".to_string(),
1773 code: status.as_str().to_string(),
1774 message: format!("Session dead letter failed: {}", error_body),
1775 })
1776 }
1777 }
1778 }
1779
1780 async fn renew_session_lock(&self) -> Result<(), QueueError> {
1789 let url = format!(
1790 "{}/{}/sessions/{}/renewlock",
1791 self.namespace_url,
1792 self.queue_name.as_str(),
1793 urlencoding::encode(self.session_id.as_str())
1794 );
1795
1796 let auth_token = self
1797 .get_auth_token()
1798 .await
1799 .map_err(|e| e.to_queue_error())?;
1800
1801 let response = self
1802 .http_client
1803 .post(&url)
1804 .header(header::AUTHORIZATION, auth_token)
1805 .header(header::CONTENT_LENGTH, "0")
1806 .send()
1807 .await
1808 .map_err(|e| {
1809 AzureError::NetworkError(format!("HTTP request failed: {}", e)).to_queue_error()
1810 })?;
1811
1812 match response.status() {
1813 StatusCode::OK | StatusCode::NO_CONTENT => {
1814 self.refresh_session_expiry();
1815 Ok(())
1816 }
1817 StatusCode::GONE | StatusCode::NOT_FOUND => Err(QueueError::SessionNotFound {
1818 session_id: self.session_id.to_string(),
1819 }),
1820 status => {
1821 let error_body = response.text().await.unwrap_or_default();
1822 Err(QueueError::ProviderError {
1823 provider: "AzureServiceBus".to_string(),
1824 code: status.as_str().to_string(),
1825 message: format!("Session lock renewal failed: {}", error_body),
1826 })
1827 }
1828 }
1829 }
1830
1831 async fn close_session(&self) -> Result<(), QueueError> {
1841 self.lock_tokens.write().await.clear();
1842 Ok(())
1843 }
1844
1845 fn session_id(&self) -> &SessionId {
1846 &self.session_id
1847 }
1848
1849 fn session_expires_at(&self) -> Timestamp {
1850 self.session_expires_at
1851 .read()
1852 .map(|guard| *guard)
1853 .unwrap_or_else(|_| {
1854 Timestamp::from_datetime(Utc::now() - Duration::seconds(1))
1858 })
1859 }
1860}