myc_notifier/executor/
mod.rs

1use chrono::Local;
2use myc_core::domain::dtos::message::{MessageSendingEvent, MessageStatus};
3use myc_core::domain::entities::{
4    LocalMessageReading, LocalMessageWrite, RemoteMessageWrite,
5};
6use mycelium_base::entities::FetchManyResponseKind;
7use mycelium_base::{
8    entities::CreateResponseKind,
9    utils::errors::{creation_err, MappedErrors},
10};
11use std::sync::Arc;
12use uuid::Uuid;
13
14/// Consumes messages from the message queue
15///
16/// This function consumes messages from the message queue sending by smtp.
17#[tracing::instrument(
18    name = "consume_messages",
19    skip(
20        local_message_read_repo,
21        local_message_write_repo,
22        remote_message_write_repo
23    )
24)]
25pub async fn consume_messages(
26    queue_name: String,
27    local_message_read_repo: Arc<dyn LocalMessageReading>,
28    local_message_write_repo: Arc<dyn LocalMessageWrite>,
29    remote_message_write_repo: Arc<dyn RemoteMessageWrite>,
30) -> Result<(i32, i32), MappedErrors> {
31    let max_retries = 3;
32    let mut retries = 0;
33    let mut processed_messages_success = 0;
34    let mut processed_messages_failed = vec![];
35
36    //
37    // Consume the queue up to the end
38    //
39    loop {
40        //
41        // Update retries counter and check if the maximum
42        // number of retries was reached
43        //
44        retries += 1;
45
46        if retries >= max_retries {
47            break;
48        }
49
50        let events = match local_message_read_repo
51            .list_oldest_messages(25, MessageStatus::Queued)
52            .await?
53        {
54            FetchManyResponseKind::NotFound => break,
55            FetchManyResponseKind::Found(messages) => messages,
56            FetchManyResponseKind::FoundPaginated {
57                records, count, ..
58            } => {
59                if count == 0 {
60                    break;
61                }
62
63                records
64            }
65        };
66
67        for event in events {
68            let mut _message = event.to_owned();
69
70            if let Err(err) =
71                process_record(event, remote_message_write_repo.clone()).await
72            {
73                _message.attempted = Some(Local::now());
74                _message.error = Some(err.to_string());
75                _message.attempts += 1;
76
77                if _message.attempts >= 5 {
78                    _message.status = MessageStatus::Failed;
79                }
80
81                processed_messages_failed.push(_message.id);
82
83                if let Err(err) = local_message_write_repo
84                    .update_message_event(_message)
85                    .await
86                {
87                    panic!("Failed to update message: {err}");
88                }
89            } else {
90                if !processed_messages_failed.contains(&_message.id) {
91                    processed_messages_failed = processed_messages_failed
92                        .into_iter()
93                        .filter(|id| id != &_message.id)
94                        .collect();
95                }
96
97                processed_messages_success += 1;
98
99                if let Err(err) = local_message_write_repo
100                    .delete_message_event(_message.id)
101                    .await
102                {
103                    panic!("Failed to delete message: {err}");
104                }
105            }
106        }
107
108        //
109        // If there are failed messages, do not break the loop, try again
110        //
111        if !processed_messages_failed.is_empty() {
112            continue;
113        }
114
115        //
116        // If there are no failed messages, break the loop
117        //
118        break;
119    }
120
121    Ok((
122        processed_messages_success,
123        processed_messages_failed.len() as i32,
124    ))
125}
126
127async fn process_record(
128    record: MessageSendingEvent,
129    message_sending_repo: Arc<dyn RemoteMessageWrite>,
130) -> Result<Uuid, MappedErrors> {
131    if let CreateResponseKind::NotCreated(_, _) =
132        message_sending_repo.send(record.message).await?
133    {
134        return creation_err("Failed to send message")
135            .with_exp_true()
136            .as_error();
137    }
138
139    Ok(record.id)
140}