Skip to main content

alien_bindings/providers/queue/
aws_sqs.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_aws_clients::sqs::{
6    DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, SqsApi, SqsClient,
7};
8use alien_error::{Context, ContextError, IntoAlienError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AwsSqsQueue {
13    queue_url: String,
14    client: SqsClient,
15}
16
17impl Debug for AwsSqsQueue {
18    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19        f.debug_struct("AwsSqsQueue")
20            .field("queue_url", &self.queue_url)
21            .finish()
22    }
23}
24
25impl AwsSqsQueue {
26    pub async fn new(
27        queue_url: String,
28        aws_config: alien_aws_clients::AwsClientConfig,
29    ) -> Result<Self> {
30        Ok(Self {
31            queue_url,
32            client: SqsClient::new(crate::http_client::create_http_client(), aws_config),
33        })
34    }
35}
36
37impl Binding for AwsSqsQueue {}
38
39#[async_trait]
40impl Queue for AwsSqsQueue {
41    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
42        let (body, _ct) = match message {
43            MessagePayload::Json(v) => (
44                serde_json::to_string(&v).into_alien_error().context(
45                    ErrorData::BindingSetupFailed {
46                        binding_type: "queue.sqs".to_string(),
47                        reason: "Failed to serialize JSON payload".to_string(),
48                    },
49                )?,
50                "application/json".to_string(),
51            ),
52            MessagePayload::Text(s) => (s, "text/plain; charset=utf-8".to_string()),
53        };
54
55        // Client-side validation: check message size
56        if body.len() > MAX_MESSAGE_BYTES {
57            return Err(alien_error::AlienError::new(
58                ErrorData::BindingSetupFailed {
59                    binding_type: "queue.sqs".to_string(),
60                    reason: format!(
61                        "Message size {} bytes exceeds limit of {} bytes",
62                        body.len(),
63                        MAX_MESSAGE_BYTES
64                    ),
65                },
66            ));
67        }
68
69        let req = SendMessageRequest::builder().message_body(body).build();
70        self.client
71            .send_message(&self.queue_url, req)
72            .await
73            .map(|_| ())
74            .map_err(|e| {
75                e.context(ErrorData::BindingSetupFailed {
76                    binding_type: "queue.sqs".to_string(),
77                    reason: "Failed to send message".to_string(),
78                })
79            })
80    }
81
82    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
83        // Client-side validation: check batch size
84        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
85            return Err(alien_error::AlienError::new(
86                ErrorData::BindingSetupFailed {
87                    binding_type: "queue.sqs".to_string(),
88                    reason: format!(
89                        "Batch size {} is invalid. Must be between 1 and {}",
90                        max_messages, MAX_BATCH_SIZE
91                    ),
92                },
93            ));
94        }
95
96        let req = ReceiveMessageRequest::builder()
97            .maybe_max_number_of_messages(Some(max_messages as i32))
98            .maybe_wait_time_seconds(Some(20))
99            .build();
100        let resp = self
101            .client
102            .receive_message(&self.queue_url, req)
103            .await
104            .context(ErrorData::BindingSetupFailed {
105                binding_type: "queue.sqs".to_string(),
106                reason: "Failed to receive".to_string(),
107            })?;
108        let msgs = resp
109            .receive_message_result
110            .messages
111            .into_iter()
112            .map(|m| {
113                let raw = m.body;
114                let payload = serde_json::from_str::<serde_json::Value>(&raw)
115                    .map(MessagePayload::Json)
116                    .unwrap_or(MessagePayload::Text(raw));
117                QueueMessage {
118                    payload,
119                    receipt_handle: m.receipt_handle,
120                }
121            })
122            .collect();
123        Ok(msgs)
124    }
125
126    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
127        let req = DeleteMessageRequest::builder()
128            .receipt_handle(receipt_handle.to_string())
129            .build();
130        self.client
131            .delete_message(&self.queue_url, req)
132            .await
133            .context(ErrorData::BindingSetupFailed {
134                binding_type: "queue.sqs".to_string(),
135                reason: "Failed to delete message".to_string(),
136            })
137    }
138}