outbox_pattern_processor/
outbox_private_repository.rs1use crate::app_state::AppState;
2use crate::error::OutboxPatternProcessorError;
3use crate::outbox::Outbox;
4use crate::outbox_cleaner_schedule::OutboxCleanerSchedule;
5use crate::outbox_repository::OutboxRepository;
6use sqlx::types::chrono::Utc;
7use sqlx::{Postgres, Transaction};
8use std::time::Duration;
9use tracing::instrument;
10use uuid::Uuid;
11
12impl OutboxRepository {
13 #[instrument(skip_all, name = "lock_and_get_outboxes")]
14 pub async fn list(app_state: &AppState) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
15 let lock_id = Uuid::now_v7();
16 let processing_until_incremente_interval = format!("{} seconds", app_state.max_in_flight_interval_in_seconds.unwrap_or(30));
17
18 Self::lock_partition_key(&app_state, lock_id, processing_until_incremente_interval).await?;
19
20 Self::get_outboxes_ranked_from_locked_partition_key(&app_state, lock_id).await
21 }
22
23 #[instrument(skip_all)]
24 async fn get_outboxes_ranked_from_locked_partition_key(
25 app_state: &&AppState,
26 lock_id: Uuid,
27 ) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
28 let sql_list = r#"with locked as (
29 select
30 o.idempotent_key,
31 row_number() over (partition by o.partition_key order by o.process_after asc) as rnk
32 from outbox o
33 inner join outbox_lock ol on o.partition_key = ol.partition_key
34 where o.process_after < now()
35 and o.processed_at is null
36 and o.attempts < $2
37 and ol.lock_id = $1
38)
39select o.*
40from outbox o
41inner join locked l on o.idempotent_key = l.idempotent_key
42where l.rnk = 1"#;
43
44 sqlx::query_as(sql_list)
45 .bind(lock_id)
46 .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
47 .fetch_all(&app_state.postgres_pool)
48 .await
49 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to list outboxes"))
50 }
51
52 #[instrument(skip_all)]
53 async fn lock_partition_key(
54 app_state: &&AppState,
55 lock_id: Uuid,
56 processing_until_incremente_interval: String,
57 ) -> Result<(), OutboxPatternProcessorError> {
58 let sql_lock = r#"insert into outbox_lock (partition_key, lock_id, processing_until)
59(
60 select o.partition_key, $1 as lock_id, now() + ($3)::interval as processing_until
61 from outbox o
62 left join outbox_lock ol on o.partition_key = ol.partition_key and ol.processed_at is null
63 where ol.partition_key is null
64 and o.processed_at is null
65 and o.process_after < now()
66 and o.attempts < $4
67 group by o.partition_key
68 order by min(o.process_after)
69 limit $2
70)
71ON CONFLICT DO NOTHING"#;
72
73 sqlx::query(sql_lock)
74 .bind(lock_id)
75 .bind(app_state.outbox_query_limit.unwrap_or(50) as i32)
76 .bind(processing_until_incremente_interval)
77 .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
78 .execute(&app_state.postgres_pool)
79 .await
80 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to lock outboxes"))?;
81 Ok(())
82 }
83
84 #[instrument(skip_all)]
85 pub async fn mark_as_processed(
86 app_state: &AppState,
87 transaction: &mut Transaction<'_, Postgres>,
88 outboxes: &[Outbox],
89 ) -> Result<(), OutboxPatternProcessorError> {
90 let sql = "update outbox set processed_at = now(), attempts = attempts + 1 where idempotent_key = ANY($1)";
91
92 let mut ids_to_update = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
93 ids_to_update.push(Uuid::now_v7());
94
95 sqlx::query(sql)
96 .bind(ids_to_update)
97 .execute(&mut **transaction)
98 .await
99 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to mark outboxes as processed"))?;
100
101 Self::unlock(app_state, transaction, outboxes).await?;
102
103 Ok(())
104 }
105
106 #[instrument(skip_all)]
107 pub async fn delete_processed(
108 app_state: &AppState,
109 transaction: &mut Transaction<'_, Postgres>,
110 outboxes: &[Outbox],
111 ) -> Result<(), OutboxPatternProcessorError> {
112 let sql = "delete from outbox where idempotent_key = ANY($1)";
113
114 let mut ids_to_delete = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
115 ids_to_delete.push(Uuid::now_v7());
116
117 sqlx::query(sql)
118 .bind(ids_to_delete)
119 .execute(&mut **transaction)
120 .await
121 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to delete processed outboxes"))?;
122
123 Self::unlock(app_state, transaction, outboxes).await?;
124
125 Ok(())
126 }
127
128 #[instrument(skip_all)]
129 pub async fn increase_attempts(
130 app_state: &AppState,
131 transaction: &mut Transaction<'_, Postgres>,
132 outboxes: &[Outbox],
133 ) -> Result<(), OutboxPatternProcessorError> {
134 if let Some(delay) = app_state.delay_for_failure_attempt_in_seconds {
135 if delay > 0 {
136 let sql = "update outbox set process_after = $2 where partition_key = ANY($1) and processed_at is null";
137
138 sqlx::query(sql)
139 .bind(outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>())
140 .bind(Utc::now() + Duration::from_secs(delay))
141 .execute(&mut **transaction)
142 .await
143 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to increase attempts"))?;
144 }
145 }
146
147 let sql = "update outbox set attempts = attempts + 1 where idempotent_key = ANY($1)";
148
149 sqlx::query(sql)
150 .bind(outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>())
151 .execute(&mut **transaction)
152 .await
153 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to increase attempts"))?;
154
155 Self::unlock(app_state, transaction, outboxes).await?;
156
157 Ok(())
158 }
159
160 #[instrument(skip_all)]
161 async fn unlock(
162 app_state: &AppState,
163 transaction: &mut Transaction<'_, Postgres>,
164 outboxes: &[Outbox],
165 ) -> Result<(), OutboxPatternProcessorError> {
166 Self::unlock_by_partition_key(app_state, transaction, outboxes).await?;
167 Self::unlock_expired_processes(app_state, transaction, outboxes).await?;
168
169 Ok(())
170 }
171
172 #[instrument(skip_all)]
173 async fn unlock_expired_processes(
174 app_state: &AppState,
175 transaction: &mut Transaction<'_, Postgres>,
176 outboxes: &[Outbox],
177 ) -> Result<(), OutboxPatternProcessorError> {
178 let sql_unlock_by_processing_until = if app_state.scheduled_clear_locked_partition.unwrap_or(false) {
179 "update outbox_lock set processed_at = now() where processing_until < now()"
180 } else {
181 "delete from outbox_lock where processing_until < now()"
182 };
183
184 let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
185 ids_to_delete.push(Uuid::now_v7());
186
187 sqlx::query(sql_unlock_by_processing_until)
188 .bind(ids_to_delete)
189 .execute(&mut **transaction)
190 .await
191 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;
192 Ok(())
193 }
194
195 #[instrument(skip_all)]
196 async fn unlock_by_partition_key(
197 app_state: &AppState,
198 transaction: &mut Transaction<'_, Postgres>,
199 outboxes: &[Outbox],
200 ) -> Result<(), OutboxPatternProcessorError> {
201 let sql_unlock_by_partition_key = if app_state.scheduled_clear_locked_partition.unwrap_or(false) {
202 "update outbox_lock set processed_at = now() where partition_key = ANY($1) and processed_at is null"
203 } else {
204 "delete from outbox_lock where partition_key = ANY($1) and processed_at is null"
205 };
206
207 let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
208 ids_to_delete.push(Uuid::now_v7());
209
210 sqlx::query(sql_unlock_by_partition_key)
211 .bind(ids_to_delete)
212 .execute(&mut **transaction)
213 .await
214 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;
215 Ok(())
216 }
217
218 #[instrument(skip_all)]
219 pub async fn clear_processed_locked_partition_key(transaction: &mut Transaction<'_, Postgres>) -> Result<(), OutboxPatternProcessorError> {
220 let sql = "delete from outbox_lock where processed_at is not null and processed_at < now()";
221
222 sqlx::query(sql)
223 .execute(&mut **transaction)
224 .await
225 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to clear processed locks"))?;
226
227 Ok(())
228 }
229
230 #[instrument(skip_all)]
231 pub async fn find_cleaner_schedule(transaction: &mut Transaction<'_, Postgres>) -> Result<Option<OutboxCleanerSchedule>, OutboxPatternProcessorError> {
232 let sql = "select * from outbox_cleaner_schedule limit 1 for update skip locked";
233
234 sqlx::query_as(sql)
235 .fetch_optional(&mut **transaction)
236 .await
237 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to update last cleaner execution"))
238 }
239
240 #[instrument(skip_all)]
241 pub async fn update_last_cleaner_execution(transaction: &mut Transaction<'_, Postgres>) -> Result<(), OutboxPatternProcessorError> {
242 let sql = "update outbox_cleaner_schedule set last_execution = now()";
243
244 sqlx::query(sql)
245 .execute(&mut **transaction)
246 .await
247 .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to update last cleaner execution"))?;
248
249 Ok(())
250 }
251}