outbox_pattern_processor/
sqs_notification_service.rs

1use crate::app_state::AppState;
2use crate::error::OutboxPatternProcessorError;
3use crate::notification::NotificationResult;
4use crate::outbox::Outbox;
5use crate::outbox_group::GroupedOutboxed;
6use aws_sdk_sqs::error::ProvideErrorMetadata;
7use aws_sdk_sqs::types::{MessageAttributeValue, SendMessageBatchRequestEntry};
8use tracing::instrument;
9use tracing::log::error;
10
11pub struct SqsNotificationService;
12
13impl SqsNotificationService {
14    #[instrument(skip_all, name = "send_to_sqs")]
15    pub async fn send(
16        app_state: &AppState,
17        outboxes: &GroupedOutboxed,
18    ) -> Result<NotificationResult, OutboxPatternProcessorError> {
19        let mut notification_result = NotificationResult::default();
20
21        let sqs_client = if let Some(client) = app_state.sqs_client.clone() {
22            client
23        } else {
24            notification_result.failed.extend(outboxes.sqs.values().flat_map(|it| it.clone()).collect::<Vec<_>>());
25            return Ok(notification_result);
26        };
27
28        for (queue_url, topic_outboxes) in outboxes.sqs.clone() {
29            let chunks = topic_outboxes.chunks(10).collect::<Vec<&[Outbox]>>();
30
31            for chunk in chunks {
32                let mut entries = vec![];
33                let mut outbox_entries = vec![];
34                for outbox in chunk {
35                    let idempotent_key_attribute_value_result = attribute_value(outbox, &outbox.idempotent_key.to_string());
36                    if idempotent_key_attribute_value_result.is_err() {
37                        notification_result.failed.push(outbox.clone());
38                        let error = idempotent_key_attribute_value_result.expect_err("Failed to get expect idempotent_key_attribute_value error");
39                        error!(
40                            "{} - Cause: {}",
41                            error.message.unwrap_or("Failed to create idempotent_key_attribute_value".to_string()),
42                            error.cause
43                        );
44                        break;
45                    }
46
47                    let idempotent_key_attribute_value = idempotent_key_attribute_value_result.expect("Failed to get expect idempotent_key_attribute_value");
48
49                    let mut entry_builder = SendMessageBatchRequestEntry::builder()
50                        .id(outbox.idempotent_key)
51                        .message_body(outbox.payload.clone())
52                        .message_attributes("x-idempotent-key", idempotent_key_attribute_value);
53
54                    if let Some(headers) = outbox.headers.clone() {
55                        for (key, value) in headers.0 {
56                            let attribute_value_result = attribute_value(outbox, &value);
57                            if attribute_value_result.is_err() {
58                                notification_result.failed.push(outbox.clone());
59                                let error = attribute_value_result.expect_err("Failed to get expect attribute_value error");
60                                error!("{} - Cause: {}", error.message.unwrap_or("Failed to create attribute_value".to_string()), error.cause);
61                                break;
62                            }
63
64                            let attribute_value = attribute_value_result.expect("Failed to get expect attribute_value");
65                            entry_builder = entry_builder.message_attributes(key, attribute_value);
66                        }
67                    }
68
69                    let entry = entry_builder.build().map_err(|error| {
70                        OutboxPatternProcessorError::new(
71                            &error.to_string(),
72                            &format!("Failed to create batch entry for outbox idempotent_key={}", outbox.idempotent_key),
73                        )
74                    })?;
75
76                    outbox_entries.push(outbox.clone());
77                    entries.push(entry);
78                }
79
80                let publish_result = sqs_client
81                    .client
82                    .send_message_batch()
83                    .queue_url(&queue_url)
84                    .set_entries(Some(entries))
85                    .send()
86                    .await
87                    .map_err(|error| {
88                        let body = error
89                            .raw_response()
90                            .map(|rr| rr.body())
91                            .map(|body| {
92                                if let Some(bytes) = body.bytes() {
93                                    String::from_utf8(bytes.to_vec()).ok().unwrap_or(String::from("Unknown: Failed to convert bytes to string"))
94                                } else {
95                                    String::from("Unknown: None bytes")
96                                }
97                            })
98                            .unwrap_or(String::from("Unknown"));
99
100                        OutboxPatternProcessorError::new(&body, error.message().unwrap_or("Failed to publish sqs batch"));
101                    });
102
103                if publish_result.is_ok() {
104                    notification_result.sent.extend(outbox_entries);
105                } else {
106                    notification_result.failed.extend(outbox_entries);
107                }
108            }
109        }
110
111        Ok(notification_result)
112    }
113}
114
115fn attribute_value(
116    outbox: &Outbox,
117    value: &str,
118) -> Result<MessageAttributeValue, OutboxPatternProcessorError> {
119    MessageAttributeValue::builder().data_type("String").string_value(value).build().map_err(|error| {
120        OutboxPatternProcessorError::new(
121            &error.to_string(),
122            &format!(
123                "Failed to create message attribute with value={} for outbox idempotent_key={}",
124                value, outbox.idempotent_key
125            ),
126        )
127    })
128}