alien_bindings/providers/queue/
aws_sqs.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{
3 Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_aws_clients::sqs::{
6 DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, SqsApi, SqsClient,
7};
8use alien_error::{Context, ContextError, IntoAlienError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AwsSqsQueue {
13 queue_url: String,
14 client: SqsClient,
15}
16
17impl Debug for AwsSqsQueue {
18 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19 f.debug_struct("AwsSqsQueue")
20 .field("queue_url", &self.queue_url)
21 .finish()
22 }
23}
24
25impl AwsSqsQueue {
26 pub fn new(queue_url: String, client: SqsClient) -> Self {
27 Self { queue_url, client }
28 }
29}
30
31impl Binding for AwsSqsQueue {}
32
33#[async_trait]
34impl Queue for AwsSqsQueue {
35 async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
36 let (body, _ct) = match message {
37 MessagePayload::Json(v) => (
38 serde_json::to_string(&v).into_alien_error().context(
39 ErrorData::BindingSetupFailed {
40 binding_type: "queue.sqs".to_string(),
41 reason: "Failed to serialize JSON payload".to_string(),
42 },
43 )?,
44 "application/json".to_string(),
45 ),
46 MessagePayload::Text(s) => (s, "text/plain; charset=utf-8".to_string()),
47 };
48
49 if body.len() > MAX_MESSAGE_BYTES {
51 return Err(alien_error::AlienError::new(
52 ErrorData::BindingSetupFailed {
53 binding_type: "queue.sqs".to_string(),
54 reason: format!(
55 "Message size {} bytes exceeds limit of {} bytes",
56 body.len(),
57 MAX_MESSAGE_BYTES
58 ),
59 },
60 ));
61 }
62
63 let req = SendMessageRequest::builder().message_body(body).build();
64 self.client
65 .send_message(&self.queue_url, req)
66 .await
67 .map(|_| ())
68 .map_err(|e| {
69 e.context(ErrorData::BindingSetupFailed {
70 binding_type: "queue.sqs".to_string(),
71 reason: "Failed to send message".to_string(),
72 })
73 })
74 }
75
76 async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
77 if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
79 return Err(alien_error::AlienError::new(
80 ErrorData::BindingSetupFailed {
81 binding_type: "queue.sqs".to_string(),
82 reason: format!(
83 "Batch size {} is invalid. Must be between 1 and {}",
84 max_messages, MAX_BATCH_SIZE
85 ),
86 },
87 ));
88 }
89
90 let req = ReceiveMessageRequest::builder()
91 .maybe_max_number_of_messages(Some(max_messages as i32))
92 .maybe_wait_time_seconds(Some(20))
93 .build();
94 let resp = self
95 .client
96 .receive_message(&self.queue_url, req)
97 .await
98 .context(ErrorData::BindingSetupFailed {
99 binding_type: "queue.sqs".to_string(),
100 reason: "Failed to receive".to_string(),
101 })?;
102 let msgs = resp
103 .receive_message_result
104 .messages
105 .into_iter()
106 .map(|m| {
107 let raw = m.body;
108 let payload = serde_json::from_str::<serde_json::Value>(&raw)
109 .map(MessagePayload::Json)
110 .unwrap_or(MessagePayload::Text(raw));
111 QueueMessage {
112 payload,
113 receipt_handle: m.receipt_handle,
114 }
115 })
116 .collect();
117 Ok(msgs)
118 }
119
120 async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
121 let req = DeleteMessageRequest::builder()
122 .receipt_handle(receipt_handle.to_string())
123 .build();
124 self.client
125 .delete_message(&self.queue_url, req)
126 .await
127 .context(ErrorData::BindingSetupFailed {
128 binding_type: "queue.sqs".to_string(),
129 reason: "Failed to delete message".to_string(),
130 })
131 }
132}