outbox_pattern_processor/
sqs_notification_service.rs1use crate::app_state::AppState;
2use crate::error::OutboxPatternProcessorError;
3use crate::notification::NotificationResult;
4use crate::outbox::Outbox;
5use crate::outbox_group::GroupedOutboxed;
6use aws_sdk_sqs::error::ProvideErrorMetadata;
7use aws_sdk_sqs::types::{MessageAttributeValue, SendMessageBatchRequestEntry};
8use tracing::instrument;
9use tracing::log::error;
10
11pub struct SqsNotificationService;
12
13impl SqsNotificationService {
14 #[instrument(skip_all, name = "send_to_sqs")]
15 pub async fn send(
16 app_state: &AppState,
17 outboxes: &GroupedOutboxed,
18 ) -> Result<NotificationResult, OutboxPatternProcessorError> {
19 let mut notification_result = NotificationResult::default();
20
21 let sqs_client = if let Some(client) = app_state.sqs_client.clone() {
22 client
23 } else {
24 notification_result.failed.extend(outboxes.sqs.values().flat_map(|it| it.clone()).collect::<Vec<_>>());
25 return Ok(notification_result);
26 };
27
28 for (queue_url, topic_outboxes) in outboxes.sqs.clone() {
29 let chunks = topic_outboxes.chunks(10).collect::<Vec<&[Outbox]>>();
30
31 for chunk in chunks {
32 let mut entries = vec![];
33 let mut outbox_entries = vec![];
34 for outbox in chunk {
35 let idempotent_key_attribute_value_result = attribute_value(outbox, &outbox.idempotent_key.to_string());
36 if idempotent_key_attribute_value_result.is_err() {
37 notification_result.failed.push(outbox.clone());
38 let error = idempotent_key_attribute_value_result.expect_err("Failed to get expect idempotent_key_attribute_value error");
39 error!(
40 "{} - Cause: {}",
41 error.message.unwrap_or("Failed to create idempotent_key_attribute_value".to_string()),
42 error.cause
43 );
44 break;
45 }
46
47 let idempotent_key_attribute_value = idempotent_key_attribute_value_result.expect("Failed to get expect idempotent_key_attribute_value");
48
49 let mut entry_builder = SendMessageBatchRequestEntry::builder()
50 .id(outbox.idempotent_key)
51 .message_body(outbox.payload.clone())
52 .message_attributes("x-idempotent-key", idempotent_key_attribute_value);
53
54 if let Some(headers) = outbox.headers.clone() {
55 for (key, value) in headers.0 {
56 let attribute_value_result = attribute_value(outbox, &value);
57 if attribute_value_result.is_err() {
58 notification_result.failed.push(outbox.clone());
59 let error = attribute_value_result.expect_err("Failed to get expect attribute_value error");
60 error!("{} - Cause: {}", error.message.unwrap_or("Failed to create attribute_value".to_string()), error.cause);
61 break;
62 }
63
64 let attribute_value = attribute_value_result.expect("Failed to get expect attribute_value");
65 entry_builder = entry_builder.message_attributes(key, attribute_value);
66 }
67 }
68
69 let entry = entry_builder.build().map_err(|error| {
70 OutboxPatternProcessorError::new(
71 &error.to_string(),
72 &format!("Failed to create batch entry for outbox idempotent_key={}", outbox.idempotent_key),
73 )
74 })?;
75
76 outbox_entries.push(outbox.clone());
77 entries.push(entry);
78 }
79
80 let publish_result = sqs_client
81 .client
82 .send_message_batch()
83 .queue_url(&queue_url)
84 .set_entries(Some(entries))
85 .send()
86 .await
87 .map_err(|error| {
88 let body = error
89 .raw_response()
90 .map(|rr| rr.body())
91 .map(|body| {
92 if let Some(bytes) = body.bytes() {
93 String::from_utf8(bytes.to_vec()).ok().unwrap_or(String::from("Unknown: Failed to convert bytes to string"))
94 } else {
95 String::from("Unknown: None bytes")
96 }
97 })
98 .unwrap_or(String::from("Unknown"));
99
100 OutboxPatternProcessorError::new(&body, error.message().unwrap_or("Failed to publish sqs batch"));
101 });
102
103 if publish_result.is_ok() {
104 notification_result.sent.extend(outbox_entries);
105 } else {
106 notification_result.failed.extend(outbox_entries);
107 }
108 }
109 }
110
111 Ok(notification_result)
112 }
113}
114
115fn attribute_value(
116 outbox: &Outbox,
117 value: &str,
118) -> Result<MessageAttributeValue, OutboxPatternProcessorError> {
119 MessageAttributeValue::builder().data_type("String").string_value(value).build().map_err(|error| {
120 OutboxPatternProcessorError::new(
121 &error.to_string(),
122 &format!(
123 "Failed to create message attribute with value={} for outbox idempotent_key={}",
124 value, outbox.idempotent_key
125 ),
126 )
127 })
128}