Skip to main content

alien_bindings/providers/queue/
aws_sqs.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_aws_clients::sqs::{
6    DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, SqsApi, SqsClient,
7};
8use alien_error::{Context, ContextError, IntoAlienError};
9use async_trait::async_trait;
10use std::fmt::{Debug, Formatter};
11
12pub struct AwsSqsQueue {
13    queue_url: String,
14    client: SqsClient,
15}
16
17impl Debug for AwsSqsQueue {
18    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19        f.debug_struct("AwsSqsQueue")
20            .field("queue_url", &self.queue_url)
21            .finish()
22    }
23}
24
25impl AwsSqsQueue {
26    pub fn new(queue_url: String, client: SqsClient) -> Self {
27        Self { queue_url, client }
28    }
29}
30
31impl Binding for AwsSqsQueue {}
32
33#[async_trait]
34impl Queue for AwsSqsQueue {
35    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
36        let (body, _ct) = match message {
37            MessagePayload::Json(v) => (
38                serde_json::to_string(&v).into_alien_error().context(
39                    ErrorData::BindingSetupFailed {
40                        binding_type: "queue.sqs".to_string(),
41                        reason: "Failed to serialize JSON payload".to_string(),
42                    },
43                )?,
44                "application/json".to_string(),
45            ),
46            MessagePayload::Text(s) => (s, "text/plain; charset=utf-8".to_string()),
47        };
48
49        // Client-side validation: check message size
50        if body.len() > MAX_MESSAGE_BYTES {
51            return Err(alien_error::AlienError::new(
52                ErrorData::BindingSetupFailed {
53                    binding_type: "queue.sqs".to_string(),
54                    reason: format!(
55                        "Message size {} bytes exceeds limit of {} bytes",
56                        body.len(),
57                        MAX_MESSAGE_BYTES
58                    ),
59                },
60            ));
61        }
62
63        let req = SendMessageRequest::builder().message_body(body).build();
64        self.client
65            .send_message(&self.queue_url, req)
66            .await
67            .map(|_| ())
68            .map_err(|e| {
69                e.context(ErrorData::BindingSetupFailed {
70                    binding_type: "queue.sqs".to_string(),
71                    reason: "Failed to send message".to_string(),
72                })
73            })
74    }
75
76    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
77        // Client-side validation: check batch size
78        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
79            return Err(alien_error::AlienError::new(
80                ErrorData::BindingSetupFailed {
81                    binding_type: "queue.sqs".to_string(),
82                    reason: format!(
83                        "Batch size {} is invalid. Must be between 1 and {}",
84                        max_messages, MAX_BATCH_SIZE
85                    ),
86                },
87            ));
88        }
89
90        let req = ReceiveMessageRequest::builder()
91            .maybe_max_number_of_messages(Some(max_messages as i32))
92            .maybe_wait_time_seconds(Some(20))
93            .build();
94        let resp = self
95            .client
96            .receive_message(&self.queue_url, req)
97            .await
98            .context(ErrorData::BindingSetupFailed {
99                binding_type: "queue.sqs".to_string(),
100                reason: "Failed to receive".to_string(),
101            })?;
102        let msgs = resp
103            .receive_message_result
104            .messages
105            .into_iter()
106            .map(|m| {
107                let raw = m.body;
108                let payload = serde_json::from_str::<serde_json::Value>(&raw)
109                    .map(MessagePayload::Json)
110                    .unwrap_or(MessagePayload::Text(raw));
111                QueueMessage {
112                    payload,
113                    receipt_handle: m.receipt_handle,
114                }
115            })
116            .collect();
117        Ok(msgs)
118    }
119
120    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
121        let req = DeleteMessageRequest::builder()
122            .receipt_handle(receipt_handle.to_string())
123            .build();
124        self.client
125            .delete_message(&self.queue_url, req)
126            .await
127            .context(ErrorData::BindingSetupFailed {
128                binding_type: "queue.sqs".to_string(),
129                reason: "Failed to delete message".to_string(),
130            })
131    }
132}