alien_bindings/providers/queue/
aws_sqs.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{
3 Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_aws_clients::sqs::{
6 DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, SqsApi, SqsClient,
7};
8use alien_error::{Context, ContextError, IntoAlienError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AwsSqsQueue {
13 queue_url: String,
14 client: SqsClient,
15}
16
17impl Debug for AwsSqsQueue {
18 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19 f.debug_struct("AwsSqsQueue")
20 .field("queue_url", &self.queue_url)
21 .finish()
22 }
23}
24
25impl AwsSqsQueue {
26 pub async fn new(
27 queue_url: String,
28 aws_config: alien_aws_clients::AwsClientConfig,
29 ) -> Result<Self> {
30 Ok(Self {
31 queue_url,
32 client: SqsClient::new(crate::http_client::create_http_client(), aws_config),
33 })
34 }
35}
36
37impl Binding for AwsSqsQueue {}
38
39#[async_trait]
40impl Queue for AwsSqsQueue {
41 async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
42 let (body, _ct) = match message {
43 MessagePayload::Json(v) => (
44 serde_json::to_string(&v).into_alien_error().context(
45 ErrorData::BindingSetupFailed {
46 binding_type: "queue.sqs".to_string(),
47 reason: "Failed to serialize JSON payload".to_string(),
48 },
49 )?,
50 "application/json".to_string(),
51 ),
52 MessagePayload::Text(s) => (s, "text/plain; charset=utf-8".to_string()),
53 };
54
55 if body.len() > MAX_MESSAGE_BYTES {
57 return Err(alien_error::AlienError::new(
58 ErrorData::BindingSetupFailed {
59 binding_type: "queue.sqs".to_string(),
60 reason: format!(
61 "Message size {} bytes exceeds limit of {} bytes",
62 body.len(),
63 MAX_MESSAGE_BYTES
64 ),
65 },
66 ));
67 }
68
69 let req = SendMessageRequest::builder().message_body(body).build();
70 self.client
71 .send_message(&self.queue_url, req)
72 .await
73 .map(|_| ())
74 .map_err(|e| {
75 e.context(ErrorData::BindingSetupFailed {
76 binding_type: "queue.sqs".to_string(),
77 reason: "Failed to send message".to_string(),
78 })
79 })
80 }
81
82 async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
83 if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
85 return Err(alien_error::AlienError::new(
86 ErrorData::BindingSetupFailed {
87 binding_type: "queue.sqs".to_string(),
88 reason: format!(
89 "Batch size {} is invalid. Must be between 1 and {}",
90 max_messages, MAX_BATCH_SIZE
91 ),
92 },
93 ));
94 }
95
96 let req = ReceiveMessageRequest::builder()
97 .maybe_max_number_of_messages(Some(max_messages as i32))
98 .maybe_wait_time_seconds(Some(20))
99 .build();
100 let resp = self
101 .client
102 .receive_message(&self.queue_url, req)
103 .await
104 .context(ErrorData::BindingSetupFailed {
105 binding_type: "queue.sqs".to_string(),
106 reason: "Failed to receive".to_string(),
107 })?;
108 let msgs = resp
109 .receive_message_result
110 .messages
111 .into_iter()
112 .map(|m| {
113 let raw = m.body;
114 let payload = serde_json::from_str::<serde_json::Value>(&raw)
115 .map(MessagePayload::Json)
116 .unwrap_or(MessagePayload::Text(raw));
117 QueueMessage {
118 payload,
119 receipt_handle: m.receipt_handle,
120 }
121 })
122 .collect();
123 Ok(msgs)
124 }
125
126 async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
127 let req = DeleteMessageRequest::builder()
128 .receipt_handle(receipt_handle.to_string())
129 .build();
130 self.client
131 .delete_message(&self.queue_url, req)
132 .await
133 .context(ErrorData::BindingSetupFailed {
134 binding_type: "queue.sqs".to_string(),
135 reason: "Failed to delete message".to_string(),
136 })
137 }
138}