dlq/
send.rs

1//! Batch message sending functionality for SQS.
2
3use crate::sqs::DeadLetterQueue;
4use std::fmt;
5
6/// Errors that can occur when sending messages to SQS.
7#[derive(Debug)]
8pub enum SendError {
9    /// Failed to build a valid SQS message entry.
10    BuildEntryFailed(String),
11    /// An error occurred in the AWS SDK.
12    AwsSdkError(
13        Box<
14            aws_sdk_sqs::error::SdkError<
15                aws_sdk_sqs::operation::send_message_batch::SendMessageBatchError,
16            >,
17        >,
18    ),
19}
20
21impl fmt::Display for SendError {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        match self {
24            SendError::BuildEntryFailed(msg) => write!(f, "failed to build message entry: {}", msg),
25            SendError::AwsSdkError(e) => write!(f, "AWS SDK error: {}", e),
26        }
27    }
28}
29
30impl std::error::Error for SendError {}
31
32impl DeadLetterQueue {
33    /// Sends multiple messages to SQS in a single batch request.
34    ///
35    /// Uses the SQS `SendMessageBatch` API to efficiently send up to 10 messages
36    /// at once. Each message is assigned a unique UUID as its batch entry ID.
37    ///
38    /// # Arguments
39    ///
40    /// * `queue_url` - The URL of the SQS queue to send messages to
41    /// * `messages` - A slice of message bodies to send. Each item must implement
42    ///   `AsRef<str>`.
43    ///
44    /// # Returns
45    ///
46    /// * `Ok(())` - All messages were sent successfully
47    /// * `Err(SendError)` - An error occurred during the send operation
48    ///
49    /// # Errors
50    ///
51    /// - `SendError::BuildEntryFailed` - A message entry couldn't be constructed
52    /// - `SendError::AwsSdkError` - The SQS API returned an error
53    ///
54    /// # Example
55    ///
56    /// ```no_run
57    /// use dlq::DeadLetterQueue;
58    ///
59    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60    /// let config = aws_config::from_env().load().await;
61    /// let dlq = DeadLetterQueue::from_config(config);
62    ///
63    /// // Send multiple messages
64    /// dlq.send_batch("https://sqs.us-east-1.amazonaws.com/123/my-queue", &["message 1", "message 2", "message 3"]).await?;
65    ///
66    /// // Empty batch is handled gracefully
67    /// let empty: Vec<String> = vec![];
68    /// dlq.send_batch("https://sqs.us-east-1.amazonaws.com/123/my-queue", &empty).await?;
69    /// # Ok(())
70    /// # }
71    /// ```
72    pub async fn send_batch(
73        &self,
74        queue_url: &str,
75        messages: &[impl AsRef<str>],
76    ) -> Result<(), SendError> {
77        // Early return for empty batches - SQS doesn't allow empty batch requests
78        if messages.is_empty() {
79            return Ok(());
80        }
81
82        let entries: Result<Vec<_>, SendError> = messages
83            .iter()
84            .map(|message| {
85                let id = uuid::Uuid::new_v4().to_string();
86                let body = message.as_ref().to_string();
87                aws_sdk_sqs::types::SendMessageBatchRequestEntry::builder()
88                    .id(id)
89                    .message_body(body)
90                    .build()
91                    .map_err(|e| SendError::BuildEntryFailed(e.to_string()))
92            })
93            .collect();
94
95        let entries = entries?;
96
97        self.client
98            .send_message_batch()
99            .queue_url(queue_url)
100            .set_entries(Some(entries))
101            .send()
102            .await
103            .map_err(|e| SendError::AwsSdkError(Box::new(e)))?;
104
105        Ok(())
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use crate::test_utils::TestEnv;
112
113    #[tokio::test]
114    async fn test_send_batch_success() {
115        let env = TestEnv::new(None).await.unwrap();
116        let queue_name = env.create_sqs_queue("test-batch").await.unwrap();
117        let queue_url = env.queue_url(&queue_name);
118        let dlq = env.dlq();
119
120        let messages = vec!["message1", "message2", "message3"];
121        let result = dlq.send_batch(&queue_url, &messages).await;
122
123        assert!(result.is_ok(), "send_batch should succeed");
124
125        // Verify messages were actually sent by receiving them
126        let receive_output = env
127            .client()
128            .receive_message()
129            .queue_url(&queue_url)
130            .max_number_of_messages(10)
131            .send()
132            .await
133            .unwrap();
134
135        let received_messages = receive_output.messages.unwrap_or_default();
136        assert_eq!(received_messages.len(), 3, "Should receive 3 messages");
137
138        let received_bodies: Vec<&str> = received_messages
139            .iter()
140            .map(|m| m.body().unwrap_or(""))
141            .collect();
142
143        assert!(received_bodies.contains(&"message1"));
144        assert!(received_bodies.contains(&"message2"));
145        assert!(received_bodies.contains(&"message3"));
146    }
147
148    #[tokio::test]
149    async fn test_send_batch_empty() {
150        let env = TestEnv::new(None).await.unwrap();
151        let queue_name = env.create_sqs_queue("test-empty").await.unwrap();
152        let queue_url = env.queue_url(&queue_name);
153        let dlq = env.dlq();
154
155        let messages: Vec<String> = vec![];
156        let result = dlq.send_batch(&queue_url, &messages).await;
157
158        assert!(result.is_ok(), "send_batch should handle empty batch");
159    }
160}