alien_bindings/providers/queue/
azure_service_bus.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{
3 Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_azure_clients::service_bus::{
6 AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
7};
8use alien_error::{Context, ContextError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AzureServiceBusQueue {
13 namespace: String,
14 queue_name: String,
15 client: AzureServiceBusDataPlaneClient,
16}
17
18impl Debug for AzureServiceBusQueue {
19 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20 f.debug_struct("AzureServiceBusQueue")
21 .field("namespace", &self.namespace)
22 .field("queue_name", &self.queue_name)
23 .finish()
24 }
25}
26
27impl AzureServiceBusQueue {
28 pub async fn new(
29 namespace: String,
30 queue_name: String,
31 azure_config: alien_azure_clients::AzureClientConfig,
32 ) -> Result<Self> {
33 Ok(Self {
34 namespace,
35 queue_name,
36 client: AzureServiceBusDataPlaneClient::new(
37 crate::http_client::create_http_client(),
38 azure_config,
39 ),
40 })
41 }
42}
43
44impl Binding for AzureServiceBusQueue {}
45
46#[async_trait]
47impl Queue for AzureServiceBusQueue {
48 async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
49 let (body, _ct) = match message {
50 MessagePayload::Json(v) => (
51 serde_json::to_string(&v).unwrap(),
52 Some("application/json".to_string()),
53 ),
54 MessagePayload::Text(s) => (s, Some("text/plain; charset=utf-8".to_string())),
55 };
56
57 if body.len() > MAX_MESSAGE_BYTES {
59 return Err(alien_error::AlienError::new(
60 ErrorData::BindingSetupFailed {
61 binding_type: "queue.servicebus".to_string(),
62 reason: format!(
63 "Message size {} bytes exceeds limit of {} bytes",
64 body.len(),
65 MAX_MESSAGE_BYTES
66 ),
67 },
68 ));
69 }
70 let params = SendMessageParameters {
71 body,
72 broker_properties: None,
73 custom_properties: std::collections::HashMap::new(),
74 };
75 self.client
76 .send_message(self.namespace.clone(), self.queue_name.clone(), params)
77 .await
78 .context(ErrorData::BindingSetupFailed {
79 binding_type: "queue.servicebus".to_string(),
80 reason: "Failed to send".to_string(),
81 })
82 }
83
84 async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
85 if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
87 return Err(alien_error::AlienError::new(
88 ErrorData::BindingSetupFailed {
89 binding_type: "queue.servicebus".to_string(),
90 reason: format!(
91 "Batch size {} is invalid. Must be between 1 and {}",
92 max_messages, MAX_BATCH_SIZE
93 ),
94 },
95 ));
96 }
97
98 let mut messages = Vec::new();
99
100 for _ in 0..std::cmp::min(max_messages, MAX_BATCH_SIZE) {
103 match self
104 .client
105 .peek_lock(
106 self.namespace.clone(),
107 self.queue_name.clone(),
108 Some(30), )
110 .await
111 {
112 Ok(Some(received_msg)) => {
113 let body = received_msg.body.clone();
114 let payload = match serde_json::from_str::<serde_json::Value>(&body) {
115 Ok(json_value) => MessagePayload::Json(json_value),
116 Err(_) => MessagePayload::Text(body),
117 };
118
119 let receipt_handle = received_msg
121 .broker_properties
122 .as_ref()
123 .and_then(|bp| bp.lock_token.clone())
124 .ok_or_else(|| {
125 alien_error::AlienError::new(ErrorData::BindingSetupFailed {
126 binding_type: "queue.servicebus".to_string(),
127 reason: "Received message without lock token".to_string(),
128 })
129 })?;
130
131 messages.push(QueueMessage {
132 payload,
133 receipt_handle,
134 });
135 }
136 Ok(None) => {
137 break;
139 }
140 Err(e) => {
141 return Err(e.context(ErrorData::BindingSetupFailed {
142 binding_type: "queue.servicebus".to_string(),
143 reason: "Failed to receive message".to_string(),
144 }));
145 }
146 }
147 }
148
149 Ok(messages)
150 }
151
152 async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
153 self.client
156 .complete_message(
157 self.namespace.clone(),
158 self.queue_name.clone(),
159 receipt_handle.to_string(), receipt_handle.to_string(), )
162 .await
163 .context(ErrorData::BindingSetupFailed {
164 binding_type: "queue.servicebus".to_string(),
165 reason: "Failed to complete message".to_string(),
166 })
167 }
168}