1use crate::sqs::DeadLetterQueue;
4use std::fmt;
5
6#[derive(Debug)]
8pub enum SendError {
9 BuildEntryFailed(String),
11 AwsSdkError(
13 Box<
14 aws_sdk_sqs::error::SdkError<
15 aws_sdk_sqs::operation::send_message_batch::SendMessageBatchError,
16 >,
17 >,
18 ),
19}
20
21impl fmt::Display for SendError {
22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 match self {
24 SendError::BuildEntryFailed(msg) => write!(f, "failed to build message entry: {}", msg),
25 SendError::AwsSdkError(e) => write!(f, "AWS SDK error: {}", e),
26 }
27 }
28}
29
30impl std::error::Error for SendError {}
31
32impl DeadLetterQueue {
33 pub async fn send_batch(
73 &self,
74 queue_url: &str,
75 messages: &[impl AsRef<str>],
76 ) -> Result<(), SendError> {
77 if messages.is_empty() {
79 return Ok(());
80 }
81
82 let entries: Result<Vec<_>, SendError> = messages
83 .iter()
84 .map(|message| {
85 let id = uuid::Uuid::new_v4().to_string();
86 let body = message.as_ref().to_string();
87 aws_sdk_sqs::types::SendMessageBatchRequestEntry::builder()
88 .id(id)
89 .message_body(body)
90 .build()
91 .map_err(|e| SendError::BuildEntryFailed(e.to_string()))
92 })
93 .collect();
94
95 let entries = entries?;
96
97 self.client
98 .send_message_batch()
99 .queue_url(queue_url)
100 .set_entries(Some(entries))
101 .send()
102 .await
103 .map_err(|e| SendError::AwsSdkError(Box::new(e)))?;
104
105 Ok(())
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use crate::test_utils::TestEnv;
112
113 #[tokio::test]
114 async fn test_send_batch_success() {
115 let env = TestEnv::new(None).await.unwrap();
116 let queue_name = env.create_sqs_queue("test-batch").await.unwrap();
117 let queue_url = env.queue_url(&queue_name);
118 let dlq = env.dlq();
119
120 let messages = vec!["message1", "message2", "message3"];
121 let result = dlq.send_batch(&queue_url, &messages).await;
122
123 assert!(result.is_ok(), "send_batch should succeed");
124
125 let receive_output = env
127 .client()
128 .receive_message()
129 .queue_url(&queue_url)
130 .max_number_of_messages(10)
131 .send()
132 .await
133 .unwrap();
134
135 let received_messages = receive_output.messages.unwrap_or_default();
136 assert_eq!(received_messages.len(), 3, "Should receive 3 messages");
137
138 let received_bodies: Vec<&str> = received_messages
139 .iter()
140 .map(|m| m.body().unwrap_or(""))
141 .collect();
142
143 assert!(received_bodies.contains(&"message1"));
144 assert!(received_bodies.contains(&"message2"));
145 assert!(received_bodies.contains(&"message3"));
146 }
147
148 #[tokio::test]
149 async fn test_send_batch_empty() {
150 let env = TestEnv::new(None).await.unwrap();
151 let queue_name = env.create_sqs_queue("test-empty").await.unwrap();
152 let queue_url = env.queue_url(&queue_name);
153 let dlq = env.dlq();
154
155 let messages: Vec<String> = vec![];
156 let result = dlq.send_batch(&queue_url, &messages).await;
157
158 assert!(result.is_ok(), "send_batch should handle empty batch");
159 }
160}