outbox_pattern_processor/
outbox_processor.rs

1use crate::app_state::AppState;
2use crate::error::OutboxPatternProcessorError;
3use crate::http_gateway::HttpGateway;
4use crate::http_notification_service::HttpNotificationService;
5use crate::outbox::Outbox;
6use crate::outbox_destination::OutboxDestination;
7use crate::outbox_group::GroupedOutboxed;
8use crate::outbox_repository::OutboxRepository;
9use crate::outbox_resources::OutboxProcessorResources;
10use crate::sns_notification_service::SnsNotificationService;
11use crate::sqs_notification_service::SqsNotificationService;
12use cron::Schedule;
13use sqlx::types::chrono::Utc;
14use std::future::Future;
15use std::str::FromStr;
16use std::time::Duration;
17use tracing::instrument;
18use tracing::log::{error, info};
19
20pub struct OutboxProcessor {
21    resources: OutboxProcessorResources,
22    signal: Option<Box<dyn Future<Output = ()> + Send>>,
23}
24
25impl OutboxProcessor {
26    pub fn new(resources: OutboxProcessorResources) -> Self {
27        Self { resources, signal: None }
28    }
29
30    pub fn with_graceful_shutdown(
31        &self,
32        signal: impl Future<Output = ()> + Send + 'static,
33    ) -> Self {
34        Self {
35            resources: self.resources.clone(),
36            signal: Some(Box::new(signal)),
37        }
38    }
39
40    pub async fn init_process(self) -> Result<(), OutboxPatternProcessorError> {
41        info!("Starting outbox processor...");
42
43        if let Some(box_signal) = self.signal {
44            let mut shutdown_signal = Box::into_pin(box_signal);
45
46            info!("Running outbox processor...");
47            loop {
48                tokio::select! {
49                    result = OutboxProcessor::one_shot_process(&self.resources) => {
50                        match result {
51                            Ok(processed_len) => {
52                                if processed_len == 0 {
53                                    tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
54                                }
55                            }
56                            Err(error) => {
57                                error!("Outbox processor failed with error: {}", error.to_string());
58                                tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
59                            }
60                        }
61                    }
62                    _ = &mut shutdown_signal => {
63                        break;
64                    }
65                }
66            }
67        } else {
68            loop {
69                let result = OutboxProcessor::one_shot_process(&self.resources).await;
70                match result {
71                    Ok(processed_len) => {
72                        if processed_len == 0 {
73                            tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
74                        }
75                    },
76                    Err(error) => {
77                        error!("Outbox processor failed with error: {}", error.to_string());
78                        tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
79                    },
80                }
81            }
82        }
83
84        info!("Outbox processor stopped!");
85
86        Ok(())
87    }
88
89    pub async fn init_processed_locked_cleaner(self) -> Result<(), OutboxPatternProcessorError> {
90        info!("Starting outbox cleaner processor...");
91
92        if let Some(box_signal) = self.signal {
93            let mut shutdown_signal = Box::into_pin(box_signal);
94
95            loop {
96                tokio::select! {
97                    _ = OutboxProcessor::one_shot_processed_locked_cleaner(&self.resources) => {
98                        tokio::time::sleep(Duration::from_secs(self.resources.outbox_cleaner_execution_interval_in_seconds.unwrap_or(60))).await;
99                    }
100                    _ = &mut shutdown_signal => {
101                        break;
102                    }
103                }
104            }
105        } else {
106            loop {
107                let result = OutboxProcessor::one_shot_processed_locked_cleaner(&self.resources).await;
108                if result.is_err() {
109                    let error = result.expect_err("Failed to get expected error");
110                    error!("Outbox processor cleaner failed with error: {}", error.to_string());
111                }
112                tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
113            }
114        }
115
116        info!("Outbox processor cleaner stopped!");
117
118        Ok(())
119    }
120
121    fn create_app_state(resources: &OutboxProcessorResources) -> Result<AppState, OutboxPatternProcessorError> {
122        Ok(AppState {
123            postgres_pool: resources.postgres_pool.clone(),
124            sqs_client: resources.sqs_client.clone(),
125            sns_client: resources.sns_client.clone(),
126            http_gateway: HttpGateway::new(resources.http_timeout_in_millis.unwrap_or(3000))?,
127            outbox_query_limit: resources.outbox_query_limit,
128            delete_after_process_successfully: resources.delete_after_process_successfully,
129            max_in_flight_interval_in_seconds: resources.max_in_flight_interval_in_seconds,
130            outbox_failure_limit: resources.outbox_failure_limit,
131            scheduled_clear_locked_partition: resources.scheduled_clear_locked_partition,
132            delay_for_failure_attempt_in_seconds: resources.delay_for_failure_attempt_in_seconds,
133        })
134    }
135
136    #[instrument(skip_all, name = "outbox-pattern-processor-cleaner")]
137    pub async fn one_shot_processed_locked_cleaner(resources: &OutboxProcessorResources) -> Result<(), OutboxPatternProcessorError> {
138        let app_state = Self::create_app_state(resources)?;
139
140        let mut transaction = app_state.begin_transaction().await?;
141
142        if let Some(outbox_clear_schedule) = OutboxRepository::find_cleaner_schedule(&mut transaction).await? {
143            if let Ok(schedule) = Schedule::from_str(&outbox_clear_schedule.cron_expression) {
144                if let Some(next_execution) = schedule.after(&outbox_clear_schedule.last_execution).next() {
145                    let seconds_until_next_execution = (next_execution - Utc::now()).num_seconds();
146                    if seconds_until_next_execution <= 0 {
147                        OutboxRepository::clear_processed_locked_partition_key(&mut transaction).await?;
148                        OutboxRepository::update_last_cleaner_execution(&mut transaction).await?;
149                        app_state.commit_transaction(transaction).await?;
150                    }
151                }
152            }
153        }
154
155        Ok(())
156    }
157
158    #[instrument(skip_all, name = "outbox-pattern-processor")]
159    pub async fn one_shot_process(resources: &OutboxProcessorResources) -> Result<usize, OutboxPatternProcessorError> {
160        let app_state = Self::create_app_state(resources)?;
161
162        let outboxes = OutboxRepository::list(&app_state).await?;
163        let outboxes_len = outboxes.len();
164
165        let grouped_outboxes = Self::group_by_destination(outboxes.clone());
166
167        let sqs_notification_result = SqsNotificationService::send(&app_state, &grouped_outboxes).await?;
168        let sns_notification_result = SnsNotificationService::send(&app_state, &grouped_outboxes).await?;
169        let http_notification_result = HttpNotificationService::send(&app_state, &grouped_outboxes).await?;
170
171        let mut failure_outbox = vec![];
172        failure_outbox.extend(sqs_notification_result.failed);
173        failure_outbox.extend(sns_notification_result.failed);
174        failure_outbox.extend(http_notification_result.failed);
175
176        let successfully_outboxes = outboxes
177            .into_iter()
178            .filter(|it| !failure_outbox.iter().any(|failed| failed.idempotent_key == it.idempotent_key))
179            .collect::<Vec<Outbox>>();
180
181        let mut transaction = app_state.begin_transaction().await?;
182
183        if app_state.delete_after_process_successfully.unwrap_or(false) {
184            OutboxRepository::delete_processed(&app_state, &mut transaction, &successfully_outboxes).await?;
185        } else {
186            OutboxRepository::mark_as_processed(&app_state, &mut transaction, &successfully_outboxes).await?;
187        }
188
189        if !failure_outbox.is_empty() {
190            OutboxRepository::increase_attempts(&app_state, &mut transaction, &failure_outbox).await?;
191        }
192
193        app_state.commit_transaction(transaction).await?;
194
195        Ok(outboxes_len)
196    }
197
198    #[instrument(skip_all)]
199    fn group_by_destination(outboxes: Vec<Outbox>) -> GroupedOutboxed {
200        let mut grouped_outboxes = GroupedOutboxed::default();
201
202        for outbox in outboxes {
203            for destination in outbox.destinations.0.clone() {
204                match destination {
205                    OutboxDestination::SqsDestination(sqs) => {
206                        grouped_outboxes.sqs.entry(sqs.queue_url).or_insert(vec![]).push(outbox.clone());
207                    },
208                    OutboxDestination::SnsDestination(sns) => {
209                        grouped_outboxes.sns.entry(sns.topic_arn).or_insert(vec![]).push(outbox.clone());
210                    },
211                    OutboxDestination::HttpDestination(_) => {
212                        grouped_outboxes.http.push(outbox.clone());
213                    },
214                }
215            }
216        }
217
218        grouped_outboxes
219    }
220}