outbox_pattern_processor/
outbox_processor.rs1use crate::app_state::AppState;
2use crate::error::OutboxPatternProcessorError;
3use crate::http_gateway::HttpGateway;
4use crate::http_notification_service::HttpNotificationService;
5use crate::outbox::Outbox;
6use crate::outbox_destination::OutboxDestination;
7use crate::outbox_group::GroupedOutboxed;
8use crate::outbox_repository::OutboxRepository;
9use crate::outbox_resources::OutboxProcessorResources;
10use crate::sns_notification_service::SnsNotificationService;
11use crate::sqs_notification_service::SqsNotificationService;
12use cron::Schedule;
13use sqlx::types::chrono::Utc;
14use std::future::Future;
15use std::str::FromStr;
16use std::time::Duration;
17use tracing::instrument;
18use tracing::log::{error, info};
19
20pub struct OutboxProcessor {
21 resources: OutboxProcessorResources,
22 signal: Option<Box<dyn Future<Output = ()> + Send>>,
23}
24
25impl OutboxProcessor {
26 pub fn new(resources: OutboxProcessorResources) -> Self {
27 Self { resources, signal: None }
28 }
29
30 pub fn with_graceful_shutdown(
31 &self,
32 signal: impl Future<Output = ()> + Send + 'static,
33 ) -> Self {
34 Self {
35 resources: self.resources.clone(),
36 signal: Some(Box::new(signal)),
37 }
38 }
39
40 pub async fn init_process(self) -> Result<(), OutboxPatternProcessorError> {
41 info!("Starting outbox processor...");
42
43 if let Some(box_signal) = self.signal {
44 let mut shutdown_signal = Box::into_pin(box_signal);
45
46 info!("Running outbox processor...");
47 loop {
48 tokio::select! {
49 result = OutboxProcessor::one_shot_process(&self.resources) => {
50 match result {
51 Ok(processed_len) => {
52 if processed_len == 0 {
53 tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
54 }
55 }
56 Err(error) => {
57 error!("Outbox processor failed with error: {}", error.to_string());
58 tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
59 }
60 }
61 }
62 _ = &mut shutdown_signal => {
63 break;
64 }
65 }
66 }
67 } else {
68 loop {
69 let result = OutboxProcessor::one_shot_process(&self.resources).await;
70 match result {
71 Ok(processed_len) => {
72 if processed_len == 0 {
73 tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
74 }
75 },
76 Err(error) => {
77 error!("Outbox processor failed with error: {}", error.to_string());
78 tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
79 },
80 }
81 }
82 }
83
84 info!("Outbox processor stopped!");
85
86 Ok(())
87 }
88
89 pub async fn init_processed_locked_cleaner(self) -> Result<(), OutboxPatternProcessorError> {
90 info!("Starting outbox cleaner processor...");
91
92 if let Some(box_signal) = self.signal {
93 let mut shutdown_signal = Box::into_pin(box_signal);
94
95 loop {
96 tokio::select! {
97 _ = OutboxProcessor::one_shot_processed_locked_cleaner(&self.resources) => {
98 tokio::time::sleep(Duration::from_secs(self.resources.outbox_cleaner_execution_interval_in_seconds.unwrap_or(60))).await;
99 }
100 _ = &mut shutdown_signal => {
101 break;
102 }
103 }
104 }
105 } else {
106 loop {
107 let result = OutboxProcessor::one_shot_processed_locked_cleaner(&self.resources).await;
108 if result.is_err() {
109 let error = result.expect_err("Failed to get expected error");
110 error!("Outbox processor cleaner failed with error: {}", error.to_string());
111 }
112 tokio::time::sleep(Duration::from_secs(self.resources.outbox_execution_interval_in_seconds.unwrap_or(5))).await;
113 }
114 }
115
116 info!("Outbox processor cleaner stopped!");
117
118 Ok(())
119 }
120
121 fn create_app_state(resources: &OutboxProcessorResources) -> Result<AppState, OutboxPatternProcessorError> {
122 Ok(AppState {
123 postgres_pool: resources.postgres_pool.clone(),
124 sqs_client: resources.sqs_client.clone(),
125 sns_client: resources.sns_client.clone(),
126 http_gateway: HttpGateway::new(resources.http_timeout_in_millis.unwrap_or(3000))?,
127 outbox_query_limit: resources.outbox_query_limit,
128 delete_after_process_successfully: resources.delete_after_process_successfully,
129 max_in_flight_interval_in_seconds: resources.max_in_flight_interval_in_seconds,
130 outbox_failure_limit: resources.outbox_failure_limit,
131 scheduled_clear_locked_partition: resources.scheduled_clear_locked_partition,
132 delay_for_failure_attempt_in_seconds: resources.delay_for_failure_attempt_in_seconds,
133 })
134 }
135
136 #[instrument(skip_all, name = "outbox-pattern-processor-cleaner")]
137 pub async fn one_shot_processed_locked_cleaner(resources: &OutboxProcessorResources) -> Result<(), OutboxPatternProcessorError> {
138 let app_state = Self::create_app_state(resources)?;
139
140 let mut transaction = app_state.begin_transaction().await?;
141
142 if let Some(outbox_clear_schedule) = OutboxRepository::find_cleaner_schedule(&mut transaction).await? {
143 if let Ok(schedule) = Schedule::from_str(&outbox_clear_schedule.cron_expression) {
144 if let Some(next_execution) = schedule.after(&outbox_clear_schedule.last_execution).next() {
145 let seconds_until_next_execution = (next_execution - Utc::now()).num_seconds();
146 if seconds_until_next_execution <= 0 {
147 OutboxRepository::clear_processed_locked_partition_key(&mut transaction).await?;
148 OutboxRepository::update_last_cleaner_execution(&mut transaction).await?;
149 app_state.commit_transaction(transaction).await?;
150 }
151 }
152 }
153 }
154
155 Ok(())
156 }
157
158 #[instrument(skip_all, name = "outbox-pattern-processor")]
159 pub async fn one_shot_process(resources: &OutboxProcessorResources) -> Result<usize, OutboxPatternProcessorError> {
160 let app_state = Self::create_app_state(resources)?;
161
162 let outboxes = OutboxRepository::list(&app_state).await?;
163 let outboxes_len = outboxes.len();
164
165 let grouped_outboxes = Self::group_by_destination(outboxes.clone());
166
167 let sqs_notification_result = SqsNotificationService::send(&app_state, &grouped_outboxes).await?;
168 let sns_notification_result = SnsNotificationService::send(&app_state, &grouped_outboxes).await?;
169 let http_notification_result = HttpNotificationService::send(&app_state, &grouped_outboxes).await?;
170
171 let mut failure_outbox = vec![];
172 failure_outbox.extend(sqs_notification_result.failed);
173 failure_outbox.extend(sns_notification_result.failed);
174 failure_outbox.extend(http_notification_result.failed);
175
176 let successfully_outboxes = outboxes
177 .into_iter()
178 .filter(|it| !failure_outbox.iter().any(|failed| failed.idempotent_key == it.idempotent_key))
179 .collect::<Vec<Outbox>>();
180
181 let mut transaction = app_state.begin_transaction().await?;
182
183 if app_state.delete_after_process_successfully.unwrap_or(false) {
184 OutboxRepository::delete_processed(&app_state, &mut transaction, &successfully_outboxes).await?;
185 } else {
186 OutboxRepository::mark_as_processed(&app_state, &mut transaction, &successfully_outboxes).await?;
187 }
188
189 if !failure_outbox.is_empty() {
190 OutboxRepository::increase_attempts(&app_state, &mut transaction, &failure_outbox).await?;
191 }
192
193 app_state.commit_transaction(transaction).await?;
194
195 Ok(outboxes_len)
196 }
197
198 #[instrument(skip_all)]
199 fn group_by_destination(outboxes: Vec<Outbox>) -> GroupedOutboxed {
200 let mut grouped_outboxes = GroupedOutboxed::default();
201
202 for outbox in outboxes {
203 for destination in outbox.destinations.0.clone() {
204 match destination {
205 OutboxDestination::SqsDestination(sqs) => {
206 grouped_outboxes.sqs.entry(sqs.queue_url).or_insert(vec![]).push(outbox.clone());
207 },
208 OutboxDestination::SnsDestination(sns) => {
209 grouped_outboxes.sns.entry(sns.topic_arn).or_insert(vec![]).push(outbox.clone());
210 },
211 OutboxDestination::HttpDestination(_) => {
212 grouped_outboxes.http.push(outbox.clone());
213 },
214 }
215 }
216 }
217
218 grouped_outboxes
219 }
220}