1use anyhow::Context;
2use aws_config::SdkConfig;
3use aws_sdk_sqs as sqs;
4use sqs::types::DeleteMessageBatchRequestEntry;
5
6pub async fn list() {
7 let dlq = DeadLetterQueue::new().await;
8 let queues = dlq.list().await;
9 println!("{}", queues.join(","));
10}
11
12pub async fn poll(url: Option<&str>) {
13 let dlq = DeadLetterQueue::new().await;
14 dlq.poll(url).await;
15}
16
17pub async fn info() {
18 let config = aws_config::load_from_env().await;
19 println!(
20 "{:#}",
21 serde_json::json!({
22 "endpoint": config.endpoint_url(),
23 "region": config.region().map(|x| x.to_string())
24 })
25 );
26}
27
28pub async fn receive(
29 client: &aws_sdk_sqs::Client,
30 queue_url: &str,
31) -> anyhow::Result<aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput> {
32 let result = client
33 .receive_message()
34 .set_queue_url(Some(queue_url.to_string()))
35 .set_max_number_of_messages(Some(10))
36 .set_visibility_timeout(Some(15))
37 .message_system_attribute_names(sqs::types::MessageSystemAttributeName::All)
38 .send()
40 .await;
41
42 result.context("failed to receive messages")
43}
44
45#[derive(Clone)]
46struct DeadLetterQueue {
47 pub _config: SdkConfig,
48 pub client: sqs::Client,
49 pub default_queue_url: Option<String>,
50}
51
52impl DeadLetterQueue {
53 pub async fn new() -> Self {
54 let config = aws_config::load_from_env().await;
55 let client = aws_sdk_sqs::Client::new(&config);
56
57 Self {
58 _config: config,
59 client,
60 default_queue_url: std::env::var("DLQ_URL")
61 .ok()
62 .or(Some(String::from("http://localhost:4566"))),
63 }
64 }
65
66 pub async fn _clear(&self, url: String, message_id: String, receipt_handle: String) {
67 self.client
68 .delete_message_batch()
69 .set_queue_url(Some(url))
70 .set_entries(Some(vec![DeleteMessageBatchRequestEntry::builder()
71 .set_id(Some(message_id))
72 .set_receipt_handle(Some(receipt_handle))
73 .build()
74 .unwrap()]))
75 .send()
76 .await
77 .unwrap();
78 }
79
80 pub async fn list(&self) -> Vec<String> {
81 let mut queues = Vec::new();
82
83 let mut output = self.client.list_queues().send().await.unwrap();
84 loop {
85 if let Some(mut list) = output.queue_urls {
86 queues.append(&mut list);
87 }
88
89 let Some(token) = output.next_token else {
90 break;
91 };
92
93 output = self
94 .client
95 .list_queues()
96 .set_next_token(Some(token))
97 .send()
98 .await
99 .unwrap();
100 }
101
102 queues
103 }
104
105 pub async fn poll(&self, queue_url: Option<&str>) {
106 let url = queue_url
107 .or(self.default_queue_url.as_deref())
108 .expect("failed: queue url was not specified");
109 let max_tries = 10;
110 let mut tries = 0;
111 loop {
112 tries += 1;
113 if tries > max_tries {
114 return;
115 }
116
117 let output = receive(&self.client, url).await.unwrap();
118
119 let Some(messages) = output.messages else {
121 return;
122 };
123
124 if messages.is_empty() {
125 return;
126 }
127
128 for m in messages {
129 println!(
130 "{}",
131 serde_json::to_string(&MessageModel::from_aws_message(m)).unwrap()
132 );
133 }
134 }
135 }
136}
137
138#[derive(Clone, Debug, serde::Serialize)]
139pub struct MessageModel {
140 pub message_id: String,
141 receipt_handle: String,
142 md5_of_body: String,
143 pub body: String,
144 md5_of_message_attributes: Option<String>,
145 attributes: Option<String>,
146 message_attributes: Option<String>,
147}
148
149impl MessageModel {
150 pub fn from_aws_message(message: aws_sdk_sqs::types::Message) -> Self {
152 Self {
153 message_id: message.message_id.expect("missing message_id"),
154 receipt_handle: message.receipt_handle.expect("missing receipt_handle"),
155 md5_of_body: message.md5_of_body.expect("missing md5_of_body"),
156 body: message.body.expect("missing body"),
157 md5_of_message_attributes: message.md5_of_message_attributes,
158 attributes: None, message_attributes: None, }
161 }
162}