outbox_pattern_processor/
outbox_private_repository.rs

1use crate::app_state::AppState;
2use crate::error::OutboxPatternProcessorError;
3use crate::outbox::Outbox;
4use crate::outbox_cleaner_schedule::OutboxCleanerSchedule;
5use crate::outbox_repository::OutboxRepository;
6use sqlx::types::chrono::Utc;
7use sqlx::{Postgres, Transaction};
8use std::time::Duration;
9use tracing::instrument;
10use uuid::Uuid;
11
12impl OutboxRepository {
13    #[instrument(skip_all, name = "lock_and_get_outboxes")]
14    pub async fn list(app_state: &AppState) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
15        let lock_id = Uuid::now_v7();
16        let processing_until_incremente_interval = format!("{} seconds", app_state.max_in_flight_interval_in_seconds.unwrap_or(30));
17
18        Self::lock_partition_key(&app_state, lock_id, processing_until_incremente_interval).await?;
19
20        Self::get_outboxes_ranked_from_locked_partition_key(&app_state, lock_id).await
21    }
22
23    #[instrument(skip_all)]
24    async fn get_outboxes_ranked_from_locked_partition_key(
25        app_state: &&AppState,
26        lock_id: Uuid,
27    ) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
28        let sql_list = r#"with locked as (
29    select
30        o.idempotent_key,
31        row_number() over (partition by o.partition_key order by o.process_after asc) as rnk
32    from outbox o
33    inner join outbox_lock ol on o.partition_key = ol.partition_key
34    where o.process_after < now()
35        and o.processed_at is null
36        and o.attempts < $2
37        and ol.lock_id = $1
38)
39select o.*
40from outbox o
41inner join locked l on o.idempotent_key = l.idempotent_key
42where l.rnk = 1"#;
43
44        sqlx::query_as(sql_list)
45            .bind(lock_id)
46            .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
47            .fetch_all(&app_state.postgres_pool)
48            .await
49            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to list outboxes"))
50    }
51
52    #[instrument(skip_all)]
53    async fn lock_partition_key(
54        app_state: &&AppState,
55        lock_id: Uuid,
56        processing_until_incremente_interval: String,
57    ) -> Result<(), OutboxPatternProcessorError> {
58        let sql_lock = r#"insert into outbox_lock (partition_key, lock_id, processing_until)
59(
60    select o.partition_key, $1 as lock_id, now() + ($3)::interval as processing_until
61    from outbox o
62    left join outbox_lock ol on o.partition_key = ol.partition_key and ol.processed_at is null
63    where ol.partition_key is null
64        and o.processed_at is null
65        and o.process_after < now()
66        and o.attempts < $4
67    group by o.partition_key
68    order by min(o.process_after)
69    limit $2
70)
71ON CONFLICT DO NOTHING"#;
72
73        sqlx::query(sql_lock)
74            .bind(lock_id)
75            .bind(app_state.outbox_query_limit.unwrap_or(50) as i32)
76            .bind(processing_until_incremente_interval)
77            .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
78            .execute(&app_state.postgres_pool)
79            .await
80            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to lock outboxes"))?;
81        Ok(())
82    }
83
84    #[instrument(skip_all)]
85    pub async fn mark_as_processed(
86        app_state: &AppState,
87        transaction: &mut Transaction<'_, Postgres>,
88        outboxes: &[Outbox],
89    ) -> Result<(), OutboxPatternProcessorError> {
90        let sql = "update outbox set processed_at = now(), attempts = attempts + 1 where idempotent_key = ANY($1)";
91
92        let mut ids_to_update = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
93        ids_to_update.push(Uuid::now_v7());
94
95        sqlx::query(sql)
96            .bind(ids_to_update)
97            .execute(&mut **transaction)
98            .await
99            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to mark outboxes as processed"))?;
100
101        Self::unlock(app_state, transaction, outboxes).await?;
102
103        Ok(())
104    }
105
106    #[instrument(skip_all)]
107    pub async fn delete_processed(
108        app_state: &AppState,
109        transaction: &mut Transaction<'_, Postgres>,
110        outboxes: &[Outbox],
111    ) -> Result<(), OutboxPatternProcessorError> {
112        let sql = "delete from outbox where idempotent_key = ANY($1)";
113
114        let mut ids_to_delete = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
115        ids_to_delete.push(Uuid::now_v7());
116
117        sqlx::query(sql)
118            .bind(ids_to_delete)
119            .execute(&mut **transaction)
120            .await
121            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to delete processed outboxes"))?;
122
123        Self::unlock(app_state, transaction, outboxes).await?;
124
125        Ok(())
126    }
127
128    #[instrument(skip_all)]
129    pub async fn increase_attempts(
130        app_state: &AppState,
131        transaction: &mut Transaction<'_, Postgres>,
132        outboxes: &[Outbox],
133    ) -> Result<(), OutboxPatternProcessorError> {
134        if let Some(delay) = app_state.delay_for_failure_attempt_in_seconds {
135            if delay > 0 {
136                let sql = "update outbox set process_after = $2 where partition_key = ANY($1) and processed_at is null";
137
138                sqlx::query(sql)
139                    .bind(outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>())
140                    .bind(Utc::now() + Duration::from_secs(delay))
141                    .execute(&mut **transaction)
142                    .await
143                    .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to increase attempts"))?;
144            }
145        }
146
147        let sql = "update outbox set attempts = attempts + 1 where idempotent_key = ANY($1)";
148
149        sqlx::query(sql)
150            .bind(outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>())
151            .execute(&mut **transaction)
152            .await
153            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to increase attempts"))?;
154
155        Self::unlock(app_state, transaction, outboxes).await?;
156
157        Ok(())
158    }
159
160    #[instrument(skip_all)]
161    async fn unlock(
162        app_state: &AppState,
163        transaction: &mut Transaction<'_, Postgres>,
164        outboxes: &[Outbox],
165    ) -> Result<(), OutboxPatternProcessorError> {
166        Self::unlock_by_partition_key(app_state, transaction, outboxes).await?;
167        Self::unlock_expired_processes(app_state, transaction, outboxes).await?;
168
169        Ok(())
170    }
171
172    #[instrument(skip_all)]
173    async fn unlock_expired_processes(
174        app_state: &AppState,
175        transaction: &mut Transaction<'_, Postgres>,
176        outboxes: &[Outbox],
177    ) -> Result<(), OutboxPatternProcessorError> {
178        let sql_unlock_by_processing_until = if app_state.scheduled_clear_locked_partition.unwrap_or(false) {
179            "update outbox_lock set processed_at = now() where processing_until < now()"
180        } else {
181            "delete from outbox_lock where processing_until < now()"
182        };
183
184        let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
185        ids_to_delete.push(Uuid::now_v7());
186
187        sqlx::query(sql_unlock_by_processing_until)
188            .bind(ids_to_delete)
189            .execute(&mut **transaction)
190            .await
191            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;
192        Ok(())
193    }
194
195    #[instrument(skip_all)]
196    async fn unlock_by_partition_key(
197        app_state: &AppState,
198        transaction: &mut Transaction<'_, Postgres>,
199        outboxes: &[Outbox],
200    ) -> Result<(), OutboxPatternProcessorError> {
201        let sql_unlock_by_partition_key = if app_state.scheduled_clear_locked_partition.unwrap_or(false) {
202            "update outbox_lock set processed_at = now() where partition_key = ANY($1) and processed_at is null"
203        } else {
204            "delete from outbox_lock where partition_key = ANY($1) and processed_at is null"
205        };
206
207        let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
208        ids_to_delete.push(Uuid::now_v7());
209
210        sqlx::query(sql_unlock_by_partition_key)
211            .bind(ids_to_delete)
212            .execute(&mut **transaction)
213            .await
214            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;
215        Ok(())
216    }
217
218    #[instrument(skip_all)]
219    pub async fn clear_processed_locked_partition_key(transaction: &mut Transaction<'_, Postgres>) -> Result<(), OutboxPatternProcessorError> {
220        let sql = "delete from outbox_lock where processed_at is not null and processed_at < now()";
221
222        sqlx::query(sql)
223            .execute(&mut **transaction)
224            .await
225            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to clear processed locks"))?;
226
227        Ok(())
228    }
229
230    #[instrument(skip_all)]
231    pub async fn find_cleaner_schedule(transaction: &mut Transaction<'_, Postgres>) -> Result<Option<OutboxCleanerSchedule>, OutboxPatternProcessorError> {
232        let sql = "select * from outbox_cleaner_schedule limit 1 for update skip locked";
233
234        sqlx::query_as(sql)
235            .fetch_optional(&mut **transaction)
236            .await
237            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to update last cleaner execution"))
238    }
239
240    #[instrument(skip_all)]
241    pub async fn update_last_cleaner_execution(transaction: &mut Transaction<'_, Postgres>) -> Result<(), OutboxPatternProcessorError> {
242        let sql = "update outbox_cleaner_schedule set last_execution = now()";
243
244        sqlx::query(sql)
245            .execute(&mut **transaction)
246            .await
247            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to update last cleaner execution"))?;
248
249        Ok(())
250    }
251}