Skip to main content

alien_bindings/providers/queue/
azure_service_bus.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_azure_clients::service_bus::{
6    AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
7};
8use alien_error::{Context, ContextError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AzureServiceBusQueue {
13    namespace: String,
14    queue_name: String,
15    client: AzureServiceBusDataPlaneClient,
16}
17
18impl Debug for AzureServiceBusQueue {
19    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20        f.debug_struct("AzureServiceBusQueue")
21            .field("namespace", &self.namespace)
22            .field("queue_name", &self.queue_name)
23            .finish()
24    }
25}
26
27impl AzureServiceBusQueue {
28    pub async fn new(
29        namespace: String,
30        queue_name: String,
31        azure_config: alien_azure_clients::AzureClientConfig,
32    ) -> Result<Self> {
33        Ok(Self {
34            namespace,
35            queue_name,
36            client: AzureServiceBusDataPlaneClient::new(
37                crate::http_client::create_http_client(),
38                azure_config,
39            ),
40        })
41    }
42}
43
44impl Binding for AzureServiceBusQueue {}
45
46#[async_trait]
47impl Queue for AzureServiceBusQueue {
48    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
49        let (body, _ct) = match message {
50            MessagePayload::Json(v) => (
51                serde_json::to_string(&v).unwrap(),
52                Some("application/json".to_string()),
53            ),
54            MessagePayload::Text(s) => (s, Some("text/plain; charset=utf-8".to_string())),
55        };
56
57        // Client-side validation: check message size
58        if body.len() > MAX_MESSAGE_BYTES {
59            return Err(alien_error::AlienError::new(
60                ErrorData::BindingSetupFailed {
61                    binding_type: "queue.servicebus".to_string(),
62                    reason: format!(
63                        "Message size {} bytes exceeds limit of {} bytes",
64                        body.len(),
65                        MAX_MESSAGE_BYTES
66                    ),
67                },
68            ));
69        }
70        let params = SendMessageParameters {
71            body,
72            broker_properties: None,
73            custom_properties: std::collections::HashMap::new(),
74        };
75        self.client
76            .send_message(self.namespace.clone(), self.queue_name.clone(), params)
77            .await
78            .context(ErrorData::BindingSetupFailed {
79                binding_type: "queue.servicebus".to_string(),
80                reason: "Failed to send".to_string(),
81            })
82    }
83
84    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
85        // Client-side validation: check batch size
86        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
87            return Err(alien_error::AlienError::new(
88                ErrorData::BindingSetupFailed {
89                    binding_type: "queue.servicebus".to_string(),
90                    reason: format!(
91                        "Batch size {} is invalid. Must be between 1 and {}",
92                        max_messages, MAX_BATCH_SIZE
93                    ),
94                },
95            ));
96        }
97
98        let mut messages = Vec::new();
99
100        // Azure Service Bus typically receives one message at a time with peek-lock
101        // We'll loop up to max_messages times to get multiple messages
102        for _ in 0..std::cmp::min(max_messages, MAX_BATCH_SIZE) {
103            match self
104                .client
105                .peek_lock(
106                    self.namespace.clone(),
107                    self.queue_name.clone(),
108                    Some(30), // 30 second timeout
109                )
110                .await
111            {
112                Ok(Some(received_msg)) => {
113                    let body = received_msg.body.clone();
114                    let payload = match serde_json::from_str::<serde_json::Value>(&body) {
115                        Ok(json_value) => MessagePayload::Json(json_value),
116                        Err(_) => MessagePayload::Text(body),
117                    };
118
119                    // Use lock token as receipt handle for acknowledgment
120                    let receipt_handle = received_msg
121                        .broker_properties
122                        .as_ref()
123                        .and_then(|bp| bp.lock_token.clone())
124                        .ok_or_else(|| {
125                            alien_error::AlienError::new(ErrorData::BindingSetupFailed {
126                                binding_type: "queue.servicebus".to_string(),
127                                reason: "Received message without lock token".to_string(),
128                            })
129                        })?;
130
131                    messages.push(QueueMessage {
132                        payload,
133                        receipt_handle,
134                    });
135                }
136                Ok(None) => {
137                    // No more messages available
138                    break;
139                }
140                Err(e) => {
141                    return Err(e.context(ErrorData::BindingSetupFailed {
142                        binding_type: "queue.servicebus".to_string(),
143                        reason: "Failed to receive message".to_string(),
144                    }));
145                }
146            }
147        }
148
149        Ok(messages)
150    }
151
152    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
153        // For Azure Service Bus, receipt_handle is the lock token
154        // We use the lock token as both message ID and lock token for the complete_message call
155        self.client
156            .complete_message(
157                self.namespace.clone(),
158                self.queue_name.clone(),
159                receipt_handle.to_string(), // message_id
160                receipt_handle.to_string(), // lock_token
161            )
162            .await
163            .context(ErrorData::BindingSetupFailed {
164                binding_type: "queue.servicebus".to_string(),
165                reason: "Failed to complete message".to_string(),
166            })
167    }
168}