Skip to main content

alien_bindings/providers/queue/
azure_service_bus.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_azure_clients::service_bus::{
6    AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
7};
8use alien_error::{Context, ContextError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AzureServiceBusQueue {
13    namespace: String,
14    queue_name: String,
15    client: AzureServiceBusDataPlaneClient,
16}
17
18impl Debug for AzureServiceBusQueue {
19    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20        f.debug_struct("AzureServiceBusQueue")
21            .field("namespace", &self.namespace)
22            .field("queue_name", &self.queue_name)
23            .finish()
24    }
25}
26
27impl AzureServiceBusQueue {
28    pub async fn new(
29        namespace: String,
30        queue_name: String,
31        azure_config: alien_azure_clients::AzureClientConfig,
32    ) -> Result<Self> {
33        Ok(Self {
34            namespace,
35            queue_name,
36            client: AzureServiceBusDataPlaneClient::new(
37                crate::http_client::create_http_client(),
38                alien_azure_clients::AzureTokenCache::new(azure_config),
39            ),
40        })
41    }
42}
43
44impl Binding for AzureServiceBusQueue {}
45
46#[async_trait]
47impl Queue for AzureServiceBusQueue {
48    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
49        let (body, _ct) = match message {
50            MessagePayload::Json(v) => (
51                serde_json::to_string(&v).unwrap(),
52                Some("application/json".to_string()),
53            ),
54            MessagePayload::Text(s) => (s, Some("text/plain; charset=utf-8".to_string())),
55        };
56
57        // Client-side validation: check message size
58        if body.len() > MAX_MESSAGE_BYTES {
59            return Err(alien_error::AlienError::new(
60                ErrorData::BindingSetupFailed {
61                    binding_type: "queue.servicebus".to_string(),
62                    reason: format!(
63                        "Message size {} bytes exceeds limit of {} bytes",
64                        body.len(),
65                        MAX_MESSAGE_BYTES
66                    ),
67                },
68            ));
69        }
70        let params = SendMessageParameters {
71            body,
72            broker_properties: None,
73            custom_properties: std::collections::HashMap::new(),
74        };
75        self.client
76            .send_message(self.namespace.clone(), self.queue_name.clone(), params)
77            .await
78            .context(ErrorData::BindingSetupFailed {
79                binding_type: "queue.servicebus".to_string(),
80                reason: "Failed to send".to_string(),
81            })
82    }
83
84    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
85        // Client-side validation: check batch size
86        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
87            return Err(alien_error::AlienError::new(
88                ErrorData::BindingSetupFailed {
89                    binding_type: "queue.servicebus".to_string(),
90                    reason: format!(
91                        "Batch size {} is invalid. Must be between 1 and {}",
92                        max_messages, MAX_BATCH_SIZE
93                    ),
94                },
95            ));
96        }
97
98        let mut messages = Vec::new();
99
100        // Azure Service Bus typically receives one message at a time with peek-lock
101        // We'll loop up to max_messages times to get multiple messages
102        for _ in 0..std::cmp::min(max_messages, MAX_BATCH_SIZE) {
103            match self
104                .client
105                .peek_lock(
106                    self.namespace.clone(),
107                    self.queue_name.clone(),
108                    Some(30), // 30 second timeout
109                )
110                .await
111            {
112                Ok(Some(received_msg)) => {
113                    let body = received_msg.body.clone();
114                    let payload = match serde_json::from_str::<serde_json::Value>(&body) {
115                        Ok(json_value) => MessagePayload::Json(json_value),
116                        Err(_) => MessagePayload::Text(body),
117                    };
118
119                    // Encode both message_id and lock_token in receipt_handle for ack()
120                    let broker_props =
121                        received_msg.broker_properties.as_ref().ok_or_else(|| {
122                            alien_error::AlienError::new(ErrorData::BindingSetupFailed {
123                                binding_type: "queue.servicebus".to_string(),
124                                reason: "Received message without broker properties".to_string(),
125                            })
126                        })?;
127                    let message_id = broker_props.message_id.as_deref().ok_or_else(|| {
128                        alien_error::AlienError::new(ErrorData::BindingSetupFailed {
129                            binding_type: "queue.servicebus".to_string(),
130                            reason: "Received message without message ID".to_string(),
131                        })
132                    })?;
133                    let lock_token = broker_props.lock_token.as_deref().ok_or_else(|| {
134                        alien_error::AlienError::new(ErrorData::BindingSetupFailed {
135                            binding_type: "queue.servicebus".to_string(),
136                            reason: "Received message without lock token".to_string(),
137                        })
138                    })?;
139                    let receipt_handle = format!("{}\n{}", message_id, lock_token);
140
141                    messages.push(QueueMessage {
142                        payload,
143                        receipt_handle,
144                    });
145                }
146                Ok(None) => {
147                    // No more messages available
148                    break;
149                }
150                Err(e) => {
151                    return Err(e.context(ErrorData::BindingSetupFailed {
152                        binding_type: "queue.servicebus".to_string(),
153                        reason: "Failed to receive message".to_string(),
154                    }));
155                }
156            }
157        }
158
159        Ok(messages)
160    }
161
162    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
163        // receipt_handle encodes "message_id\nlock_token" (set in receive())
164        let (message_id, lock_token) = receipt_handle.split_once('\n').ok_or_else(|| {
165            alien_error::AlienError::new(ErrorData::BindingSetupFailed {
166                binding_type: "queue.servicebus".to_string(),
167                reason: "Invalid receipt handle format: expected message_id\\nlock_token"
168                    .to_string(),
169            })
170        })?;
171        self.client
172            .complete_message(
173                self.namespace.clone(),
174                self.queue_name.clone(),
175                message_id.to_string(),
176                lock_token.to_string(),
177            )
178            .await
179            .context(ErrorData::BindingSetupFailed {
180                binding_type: "queue.servicebus".to_string(),
181                reason: "Failed to complete message".to_string(),
182            })
183    }
184}