dlq/
sqs.rs

1use anyhow::Context;
2use aws_config::SdkConfig;
3use aws_sdk_sqs as sqs;
4use sqs::types::DeleteMessageBatchRequestEntry;
5
6pub async fn list() {
7    let dlq = DeadLetterQueue::new().await;
8    let queues = dlq.list().await;
9    println!("{}", queues.join(","));
10}
11
12pub async fn poll(url: Option<&str>) {
13    let dlq = DeadLetterQueue::new().await;
14    dlq.poll(url).await;
15}
16
17pub async fn info() {
18    let config = aws_config::load_from_env().await;
19    println!(
20        "{:#}",
21        serde_json::json!({
22            "endpoint": config.endpoint_url(),
23            "region": config.region().map(|x| x.to_string())
24        })
25    );
26}
27
28pub async fn receive(
29    client: &aws_sdk_sqs::Client,
30    queue_url: &str,
31) -> anyhow::Result<aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput> {
32    let result = client
33        .receive_message()
34        .set_queue_url(Some(queue_url.to_string()))
35        .set_max_number_of_messages(Some(10))
36        .set_visibility_timeout(Some(15))
37        .message_system_attribute_names(sqs::types::MessageSystemAttributeName::All)
38        //.set_wait_time_seconds(Some(3))
39        .send()
40        .await;
41
42    result.context("failed to receive messages")
43}
44
45#[derive(Clone)]
46struct DeadLetterQueue {
47    pub _config: SdkConfig,
48    pub client: sqs::Client,
49    pub default_queue_url: Option<String>,
50}
51
52impl DeadLetterQueue {
53    pub async fn new() -> Self {
54        let config = aws_config::load_from_env().await;
55        let client = aws_sdk_sqs::Client::new(&config);
56
57        Self {
58            _config: config,
59            client,
60            default_queue_url: std::env::var("DLQ_URL")
61                .ok()
62                .or(Some(String::from("http://localhost:4566"))),
63        }
64    }
65
66    pub async fn _clear(&self, url: String, message_id: String, receipt_handle: String) {
67        self.client
68            .delete_message_batch()
69            .set_queue_url(Some(url))
70            .set_entries(Some(vec![DeleteMessageBatchRequestEntry::builder()
71                .set_id(Some(message_id))
72                .set_receipt_handle(Some(receipt_handle))
73                .build()
74                .unwrap()]))
75            .send()
76            .await
77            .unwrap();
78    }
79
80    pub async fn list(&self) -> Vec<String> {
81        let mut queues = Vec::new();
82
83        let mut output = self.client.list_queues().send().await.unwrap();
84        loop {
85            if let Some(mut list) = output.queue_urls {
86                queues.append(&mut list);
87            }
88
89            let Some(token) = output.next_token else {
90                break;
91            };
92
93            output = self
94                .client
95                .list_queues()
96                .set_next_token(Some(token))
97                .send()
98                .await
99                .unwrap();
100        }
101
102        queues
103    }
104
105    pub async fn poll(&self, queue_url: Option<&str>) {
106        let url = queue_url
107            .or(self.default_queue_url.as_deref())
108            .expect("failed: queue url was not specified");
109        let max_tries = 10;
110        let mut tries = 0;
111        loop {
112            tries += 1;
113            if tries > max_tries {
114                return;
115            }
116
117            let output = receive(&self.client, url).await.unwrap();
118
119            // if none, that suggests the whole queue has been received recently
120            let Some(messages) = output.messages else {
121                return;
122            };
123
124            if messages.is_empty() {
125                return;
126            }
127
128            for m in messages {
129                println!(
130                    "{}",
131                    serde_json::to_string(&MessageModel::from_aws_message(m)).unwrap()
132                );
133            }
134        }
135    }
136}
137
138#[derive(Clone, Debug, serde::Serialize)]
139pub struct MessageModel {
140    pub message_id: String,
141    receipt_handle: String,
142    md5_of_body: String,
143    pub body: String,
144    md5_of_message_attributes: Option<String>,
145    attributes: Option<String>,
146    message_attributes: Option<String>,
147}
148
149impl MessageModel {
150    /// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Message.html
151    pub fn from_aws_message(message: aws_sdk_sqs::types::Message) -> Self {
152        Self {
153            message_id: message.message_id.expect("missing message_id"),
154            receipt_handle: message.receipt_handle.expect("missing receipt_handle"),
155            md5_of_body: message.md5_of_body.expect("missing md5_of_body"),
156            body: message.body.expect("missing body"),
157            md5_of_message_attributes: message.md5_of_message_attributes,
158            attributes: None,         //value.attributes,
159            message_attributes: None, //value.message_attributes,
160        }
161    }
162}