myc_notifier/executor/
mod.rs1use chrono::Local;
2use myc_core::domain::dtos::message::{MessageSendingEvent, MessageStatus};
3use myc_core::domain::entities::{
4 LocalMessageReading, LocalMessageWrite, RemoteMessageWrite,
5};
6use mycelium_base::entities::FetchManyResponseKind;
7use mycelium_base::{
8 entities::CreateResponseKind,
9 utils::errors::{creation_err, MappedErrors},
10};
11use std::sync::Arc;
12use uuid::Uuid;
13
14#[tracing::instrument(
18 name = "consume_messages",
19 skip(
20 local_message_read_repo,
21 local_message_write_repo,
22 remote_message_write_repo
23 )
24)]
25pub async fn consume_messages(
26 queue_name: String,
27 local_message_read_repo: Arc<dyn LocalMessageReading>,
28 local_message_write_repo: Arc<dyn LocalMessageWrite>,
29 remote_message_write_repo: Arc<dyn RemoteMessageWrite>,
30) -> Result<(i32, i32), MappedErrors> {
31 let max_retries = 3;
32 let mut retries = 0;
33 let mut processed_messages_success = 0;
34 let mut processed_messages_failed = vec![];
35
36 loop {
40 retries += 1;
45
46 if retries >= max_retries {
47 break;
48 }
49
50 let events = match local_message_read_repo
51 .list_oldest_messages(25, MessageStatus::Queued)
52 .await?
53 {
54 FetchManyResponseKind::NotFound => break,
55 FetchManyResponseKind::Found(messages) => messages,
56 FetchManyResponseKind::FoundPaginated {
57 records, count, ..
58 } => {
59 if count == 0 {
60 break;
61 }
62
63 records
64 }
65 };
66
67 for event in events {
68 let mut _message = event.to_owned();
69
70 if let Err(err) =
71 process_record(event, remote_message_write_repo.clone()).await
72 {
73 _message.attempted = Some(Local::now());
74 _message.error = Some(err.to_string());
75 _message.attempts += 1;
76
77 if _message.attempts >= 5 {
78 _message.status = MessageStatus::Failed;
79 }
80
81 processed_messages_failed.push(_message.id);
82
83 if let Err(err) = local_message_write_repo
84 .update_message_event(_message)
85 .await
86 {
87 panic!("Failed to update message: {err}");
88 }
89 } else {
90 if !processed_messages_failed.contains(&_message.id) {
91 processed_messages_failed = processed_messages_failed
92 .into_iter()
93 .filter(|id| id != &_message.id)
94 .collect();
95 }
96
97 processed_messages_success += 1;
98
99 if let Err(err) = local_message_write_repo
100 .delete_message_event(_message.id)
101 .await
102 {
103 panic!("Failed to delete message: {err}");
104 }
105 }
106 }
107
108 if !processed_messages_failed.is_empty() {
112 continue;
113 }
114
115 break;
119 }
120
121 Ok((
122 processed_messages_success,
123 processed_messages_failed.len() as i32,
124 ))
125}
126
127async fn process_record(
128 record: MessageSendingEvent,
129 message_sending_repo: Arc<dyn RemoteMessageWrite>,
130) -> Result<Uuid, MappedErrors> {
131 if let CreateResponseKind::NotCreated(_, _) =
132 message_sending_repo.send(record.message).await?
133 {
134 return creation_err("Failed to send message")
135 .with_exp_true()
136 .as_error();
137 }
138
139 Ok(record.id)
140}