dlq/sqs.rs
1//! SQS client wrapper and message types for dead letter queue operations.
2
3use anyhow::Context;
4use aws_config::SdkConfig;
5use aws_sdk_sqs as sqs;
6use sqs::types::DeleteMessageBatchRequestEntry;
7
8/// Receives messages from an SQS queue.
9///
10/// Retrieves up to 10 messages at a time with a 15-second visibility timeout.
11/// Messages become invisible to other consumers during this period.
12///
13/// # Arguments
14///
15/// * `client` - The SQS client to use for the request
16/// * `queue_url` - The URL of the queue to receive messages from
17///
18/// # Returns
19///
20/// The raw SQS receive message output containing the messages and metadata.
21///
22/// # Errors
23///
24/// Returns an error if the SQS API call fails.
25pub async fn receive(
26 client: &aws_sdk_sqs::Client,
27 queue_url: &str,
28) -> anyhow::Result<aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput> {
29 let result = client
30 .receive_message()
31 .set_queue_url(Some(queue_url.to_string()))
32 .set_max_number_of_messages(Some(10))
33 .set_visibility_timeout(Some(15))
34 .message_system_attribute_names(sqs::types::MessageSystemAttributeName::All)
35 //.set_wait_time_seconds(Some(3))
36 .send()
37 .await;
38
39 result.context("failed to receive messages")
40}
41
42/// Client for interacting with AWS SQS dead letter queues.
43///
44/// Provides high-level operations for listing queues, polling messages,
45/// and clearing messages from queues.
46///
47/// # Example
48///
49/// ```no_run
50/// use dlq::DeadLetterQueue;
51///
52/// # async fn example() {
53/// let config = aws_config::from_env().load().await;
54/// let dlq = DeadLetterQueue::from_config(config);
55///
56/// // List all queues
57/// let queues = dlq.list().await;
58///
59/// // Poll messages from a queue
60/// dlq.poll("https://sqs.us-east-1.amazonaws.com/123456789/my-dlq").await;
61/// # }
62/// ```
63#[derive(Clone)]
64pub struct DeadLetterQueue {
65 /// The AWS SDK configuration used for SQS operations
66 pub config: SdkConfig,
67 /// The SQS client instance
68 pub client: sqs::Client,
69}
70
71impl DeadLetterQueue {
72 /// Creates a DeadLetterQueue from a pre-built AWS SDK config.
73 ///
74 /// This is the preferred constructor as it allows the caller to configure
75 /// credentials and endpoints based on their needs (e.g., `--local` flag for LocalStack).
76 ///
77 /// # Arguments
78 ///
79 /// * `config` - Pre-configured AWS SDK configuration
80 ///
81 /// # Example
82 ///
83 /// ```no_run
84 /// use dlq::DeadLetterQueue;
85 ///
86 /// # async fn example() {
87 /// let config = aws_config::from_env().load().await;
88 /// let dlq = DeadLetterQueue::from_config(config);
89 ///
90 /// // List all queues
91 /// let queues = dlq.list().await;
92 /// # }
93 /// ```
94 pub fn from_config(config: SdkConfig) -> Self {
95 let client = sqs::Client::new(&config);
96 Self { config, client }
97 }
98
99 /// Deletes a message from the queue using batch delete.
100 ///
101 /// This is primarily used internally to acknowledge processed messages.
102 ///
103 /// # Arguments
104 ///
105 /// * `url` - The queue URL
106 /// * `message_id` - The SQS message ID
107 /// * `receipt_handle` - The receipt handle from when the message was received
108 pub async fn _clear(&self, url: String, message_id: String, receipt_handle: String) {
109 self.client
110 .delete_message_batch()
111 .set_queue_url(Some(url))
112 .set_entries(Some(vec![DeleteMessageBatchRequestEntry::builder()
113 .set_id(Some(message_id))
114 .set_receipt_handle(Some(receipt_handle))
115 .build()
116 .unwrap()]))
117 .send()
118 .await
119 .unwrap();
120 }
121
122 /// Lists all SQS queue URLs in the AWS account.
123 ///
124 /// Handles pagination automatically, returning all queues regardless of count.
125 ///
126 /// # Returns
127 ///
128 /// A vector of queue URL strings.
129 ///
130 /// # Example
131 ///
132 /// ```no_run
133 /// use dlq::DeadLetterQueue;
134 ///
135 /// # async fn example() {
136 /// let config = aws_config::from_env().load().await;
137 /// let dlq = DeadLetterQueue::from_config(config);
138 ///
139 /// for queue_url in dlq.list().await {
140 /// println!("Found queue: {}", queue_url);
141 /// }
142 /// # }
143 /// ```
144 pub async fn list(&self) -> Vec<String> {
145 let mut queues = Vec::new();
146
147 let mut output = self.client.list_queues().send().await.unwrap();
148 loop {
149 if let Some(mut list) = output.queue_urls {
150 queues.append(&mut list);
151 }
152
153 let Some(token) = output.next_token else {
154 break;
155 };
156
157 output = self
158 .client
159 .list_queues()
160 .set_next_token(Some(token))
161 .send()
162 .await
163 .unwrap();
164 }
165
166 queues
167 }
168
169 /// Polls messages from a queue and prints them as JSON.
170 ///
171 /// Continuously receives messages until the queue is empty or the maximum
172 /// number of attempts (10) is reached. Each message is printed to stdout
173 /// as a JSON object.
174 ///
175 /// # Arguments
176 ///
177 /// * `queue_url` - The URL of the queue to poll messages from
178 ///
179 /// # Example
180 ///
181 /// ```no_run
182 /// use dlq::DeadLetterQueue;
183 ///
184 /// # async fn example() {
185 /// let config = aws_config::from_env().load().await;
186 /// let dlq = DeadLetterQueue::from_config(config);
187 ///
188 /// // Poll a specific queue
189 /// dlq.poll("https://sqs.us-east-1.amazonaws.com/123/my-dlq").await;
190 /// # }
191 /// ```
192 pub async fn poll(&self, queue_url: &str) {
193 let url = queue_url;
194 let max_tries = 10;
195 let mut tries = 0;
196 loop {
197 tries += 1;
198 if tries > max_tries {
199 return;
200 }
201
202 let output = receive(&self.client, url).await.unwrap();
203
204 // if none, that suggests the whole queue has been received recently
205 let Some(messages) = output.messages else {
206 return;
207 };
208
209 if messages.is_empty() {
210 return;
211 }
212
213 for m in messages {
214 println!(
215 "{}",
216 serde_json::to_string(&MessageModel::from_aws_message(m)).unwrap()
217 );
218 }
219 }
220 }
221}
222
223/// Serializable representation of an SQS message.
224///
225/// Contains the essential fields from an SQS message, formatted for
226/// JSON output when polling queues.
227#[derive(Clone, Debug, serde::Serialize)]
228pub struct MessageModel {
229 /// Unique identifier for the message assigned by SQS
230 pub message_id: String,
231 /// Handle used to delete or change visibility of the message
232 receipt_handle: String,
233 /// MD5 digest of the message body for integrity verification
234 md5_of_body: String,
235 /// The actual message content
236 pub body: String,
237 /// MD5 digest of message attributes (if any)
238 md5_of_message_attributes: Option<String>,
239 /// System attributes (currently not populated)
240 attributes: Option<String>,
241 /// Custom message attributes (currently not populated)
242 message_attributes: Option<String>,
243}
244
245impl MessageModel {
246 /// Converts an AWS SDK Message into a MessageModel.
247 ///
248 /// Extracts the required fields from the SQS message structure.
249 ///
250 /// # Panics
251 ///
252 /// Panics if the message is missing required fields (message_id, receipt_handle,
253 /// md5_of_body, or body).
254 ///
255 /// # See Also
256 ///
257 /// - [AWS SQS Message API Reference](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Message.html)
258 pub fn from_aws_message(message: aws_sdk_sqs::types::Message) -> Self {
259 Self {
260 message_id: message.message_id.expect("missing message_id"),
261 receipt_handle: message.receipt_handle.expect("missing receipt_handle"),
262 md5_of_body: message.md5_of_body.expect("missing md5_of_body"),
263 body: message.body.expect("missing body"),
264 md5_of_message_attributes: message.md5_of_message_attributes,
265 attributes: None, //value.attributes,
266 message_attributes: None, //value.message_attributes,
267 }
268 }
269}