dlq/
sqs.rs

1//! SQS client wrapper and message types for dead letter queue operations.
2
3use anyhow::Context;
4use aws_config::SdkConfig;
5use aws_sdk_sqs as sqs;
6use sqs::types::DeleteMessageBatchRequestEntry;
7
8/// Receives messages from an SQS queue.
9///
10/// Retrieves up to 10 messages at a time with a 15-second visibility timeout.
11/// Messages become invisible to other consumers during this period.
12///
13/// # Arguments
14///
15/// * `client` - The SQS client to use for the request
16/// * `queue_url` - The URL of the queue to receive messages from
17///
18/// # Returns
19///
20/// The raw SQS receive message output containing the messages and metadata.
21///
22/// # Errors
23///
24/// Returns an error if the SQS API call fails.
25pub async fn receive(
26    client: &aws_sdk_sqs::Client,
27    queue_url: &str,
28) -> anyhow::Result<aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput> {
29    let result = client
30        .receive_message()
31        .set_queue_url(Some(queue_url.to_string()))
32        .set_max_number_of_messages(Some(10))
33        .set_visibility_timeout(Some(15))
34        .message_system_attribute_names(sqs::types::MessageSystemAttributeName::All)
35        //.set_wait_time_seconds(Some(3))
36        .send()
37        .await;
38
39    result.context("failed to receive messages")
40}
41
42/// Client for interacting with AWS SQS dead letter queues.
43///
44/// Provides high-level operations for listing queues, polling messages,
45/// and clearing messages from queues.
46///
47/// # Example
48///
49/// ```no_run
50/// use dlq::DeadLetterQueue;
51///
52/// # async fn example() {
53/// let config = aws_config::from_env().load().await;
54/// let dlq = DeadLetterQueue::from_config(config);
55///
56/// // List all queues
57/// let queues = dlq.list().await;
58///
59/// // Poll messages from a queue
60/// dlq.poll("https://sqs.us-east-1.amazonaws.com/123456789/my-dlq").await;
61/// # }
62/// ```
63#[derive(Clone)]
64pub struct DeadLetterQueue {
65    /// The AWS SDK configuration used for SQS operations
66    pub config: SdkConfig,
67    /// The SQS client instance
68    pub client: sqs::Client,
69}
70
71impl DeadLetterQueue {
72    /// Creates a DeadLetterQueue from a pre-built AWS SDK config.
73    ///
74    /// This is the preferred constructor as it allows the caller to configure
75    /// credentials and endpoints based on their needs (e.g., `--local` flag for LocalStack).
76    ///
77    /// # Arguments
78    ///
79    /// * `config` - Pre-configured AWS SDK configuration
80    ///
81    /// # Example
82    ///
83    /// ```no_run
84    /// use dlq::DeadLetterQueue;
85    ///
86    /// # async fn example() {
87    /// let config = aws_config::from_env().load().await;
88    /// let dlq = DeadLetterQueue::from_config(config);
89    ///
90    /// // List all queues
91    /// let queues = dlq.list().await;
92    /// # }
93    /// ```
94    pub fn from_config(config: SdkConfig) -> Self {
95        let client = sqs::Client::new(&config);
96        Self { config, client }
97    }
98
99    /// Deletes a message from the queue using batch delete.
100    ///
101    /// This is primarily used internally to acknowledge processed messages.
102    ///
103    /// # Arguments
104    ///
105    /// * `url` - The queue URL
106    /// * `message_id` - The SQS message ID
107    /// * `receipt_handle` - The receipt handle from when the message was received
108    pub async fn _clear(&self, url: String, message_id: String, receipt_handle: String) {
109        self.client
110            .delete_message_batch()
111            .set_queue_url(Some(url))
112            .set_entries(Some(vec![DeleteMessageBatchRequestEntry::builder()
113                .set_id(Some(message_id))
114                .set_receipt_handle(Some(receipt_handle))
115                .build()
116                .unwrap()]))
117            .send()
118            .await
119            .unwrap();
120    }
121
122    /// Lists all SQS queue URLs in the AWS account.
123    ///
124    /// Handles pagination automatically, returning all queues regardless of count.
125    ///
126    /// # Returns
127    ///
128    /// A vector of queue URL strings.
129    ///
130    /// # Example
131    ///
132    /// ```no_run
133    /// use dlq::DeadLetterQueue;
134    ///
135    /// # async fn example() {
136    /// let config = aws_config::from_env().load().await;
137    /// let dlq = DeadLetterQueue::from_config(config);
138    ///
139    /// for queue_url in dlq.list().await {
140    ///     println!("Found queue: {}", queue_url);
141    /// }
142    /// # }
143    /// ```
144    pub async fn list(&self) -> Vec<String> {
145        let mut queues = Vec::new();
146
147        let mut output = self.client.list_queues().send().await.unwrap();
148        loop {
149            if let Some(mut list) = output.queue_urls {
150                queues.append(&mut list);
151            }
152
153            let Some(token) = output.next_token else {
154                break;
155            };
156
157            output = self
158                .client
159                .list_queues()
160                .set_next_token(Some(token))
161                .send()
162                .await
163                .unwrap();
164        }
165
166        queues
167    }
168
169    /// Polls messages from a queue and prints them as JSON.
170    ///
171    /// Continuously receives messages until the queue is empty or the maximum
172    /// number of attempts (10) is reached. Each message is printed to stdout
173    /// as a JSON object.
174    ///
175    /// # Arguments
176    ///
177    /// * `queue_url` - The URL of the queue to poll messages from
178    ///
179    /// # Example
180    ///
181    /// ```no_run
182    /// use dlq::DeadLetterQueue;
183    ///
184    /// # async fn example() {
185    /// let config = aws_config::from_env().load().await;
186    /// let dlq = DeadLetterQueue::from_config(config);
187    ///
188    /// // Poll a specific queue
189    /// dlq.poll("https://sqs.us-east-1.amazonaws.com/123/my-dlq").await;
190    /// # }
191    /// ```
192    pub async fn poll(&self, queue_url: &str) {
193        let url = queue_url;
194        let max_tries = 10;
195        let mut tries = 0;
196        loop {
197            tries += 1;
198            if tries > max_tries {
199                return;
200            }
201
202            let output = receive(&self.client, url).await.unwrap();
203
204            // if none, that suggests the whole queue has been received recently
205            let Some(messages) = output.messages else {
206                return;
207            };
208
209            if messages.is_empty() {
210                return;
211            }
212
213            for m in messages {
214                println!(
215                    "{}",
216                    serde_json::to_string(&MessageModel::from_aws_message(m)).unwrap()
217                );
218            }
219        }
220    }
221}
222
223/// Serializable representation of an SQS message.
224///
225/// Contains the essential fields from an SQS message, formatted for
226/// JSON output when polling queues.
227#[derive(Clone, Debug, serde::Serialize)]
228pub struct MessageModel {
229    /// Unique identifier for the message assigned by SQS
230    pub message_id: String,
231    /// Handle used to delete or change visibility of the message
232    receipt_handle: String,
233    /// MD5 digest of the message body for integrity verification
234    md5_of_body: String,
235    /// The actual message content
236    pub body: String,
237    /// MD5 digest of message attributes (if any)
238    md5_of_message_attributes: Option<String>,
239    /// System attributes (currently not populated)
240    attributes: Option<String>,
241    /// Custom message attributes (currently not populated)
242    message_attributes: Option<String>,
243}
244
245impl MessageModel {
246    /// Converts an AWS SDK Message into a MessageModel.
247    ///
248    /// Extracts the required fields from the SQS message structure.
249    ///
250    /// # Panics
251    ///
252    /// Panics if the message is missing required fields (message_id, receipt_handle,
253    /// md5_of_body, or body).
254    ///
255    /// # See Also
256    ///
257    /// - [AWS SQS Message API Reference](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Message.html)
258    pub fn from_aws_message(message: aws_sdk_sqs::types::Message) -> Self {
259        Self {
260            message_id: message.message_id.expect("missing message_id"),
261            receipt_handle: message.receipt_handle.expect("missing receipt_handle"),
262            md5_of_body: message.md5_of_body.expect("missing md5_of_body"),
263            body: message.body.expect("missing body"),
264            md5_of_message_attributes: message.md5_of_message_attributes,
265            attributes: None,         //value.attributes,
266            message_attributes: None, //value.message_attributes,
267        }
268    }
269}