outbox_pattern_processor/
outbox_private_repository.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
use crate::app_state::AppState;
use crate::error::OutboxPatternProcessorError;
use crate::outbox::Outbox;
use crate::outbox_cleaner_schedule::OutboxCleanerSchedule;
use crate::outbox_repository::OutboxRepository;
use sqlx::{Postgres, Transaction};
use tracing::instrument;
use uuid::Uuid;

impl OutboxRepository {
    #[instrument(skip_all, name = "lock_and_get_outboxes")]
    pub async fn list(app_state: &AppState) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
        let lock_id = Uuid::now_v7();
        let processing_until_incremente_interval = format!("{} seconds", app_state.max_in_flight_interval_in_seconds.unwrap_or(30));

        Self::lock_partition_key(&app_state, lock_id, processing_until_incremente_interval).await?;

        Self::get_outboxes_ranked_from_locked_partition_key(&app_state, lock_id).await
    }

    #[instrument(skip_all)]
    async fn get_outboxes_ranked_from_locked_partition_key(
        app_state: &&AppState,
        lock_id: Uuid,
    ) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
        let sql_list = r#"with locked as (
    select
        o.idempotent_key,
        row_number() over (partition by o.partition_key order by o.process_after asc) as rnk
    from outbox o
    inner join outbox_lock ol on o.partition_key = ol.partition_key
    where o.process_after < now()
        and o.processed_at is null
        and o.attempts < $2
        and ol.lock_id = $1
)
select o.*
from outbox o
inner join locked l on o.idempotent_key = l.idempotent_key
where l.rnk = 1"#;

        sqlx::query_as(sql_list)
            .bind(lock_id)
            .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
            .fetch_all(&app_state.postgres_pool)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to list outboxes"))
    }

    #[instrument(skip_all)]
    async fn lock_partition_key(
        app_state: &&AppState,
        lock_id: Uuid,
        processing_until_incremente_interval: String,
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql_lock = r#"insert into outbox_lock (partition_key, lock_id, processing_until)
(
    select o.partition_key, $1 as lock_id, now() + ($3)::interval as processing_until
    from outbox o
    left join outbox_lock ol on o.partition_key = ol.partition_key and ol.processed_at is null
    where ol.partition_key is null
        and o.processed_at is null
        and o.process_after < now()
        and o.attempts < $4
    group by o.partition_key
    order by min(o.process_after)
    limit $2
)
ON CONFLICT DO NOTHING"#;

        sqlx::query(sql_lock)
            .bind(lock_id)
            .bind(app_state.outbox_query_limit.unwrap_or(50) as i32)
            .bind(processing_until_incremente_interval)
            .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
            .execute(&app_state.postgres_pool)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to lock outboxes"))?;
        Ok(())
    }

    #[instrument(skip_all)]
    pub async fn mark_as_processed(
        app_state: &AppState,
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = "update outbox set processed_at = now(), attempts = attempts + 1 where idempotent_key = ANY($1)";

        let mut ids_to_update = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
        ids_to_update.push(Uuid::now_v7());

        sqlx::query(sql)
            .bind(ids_to_update)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to mark outboxes as processed"))?;

        Self::unlock(app_state, transaction, outboxes).await?;

        Ok(())
    }

    #[instrument(skip_all)]
    pub async fn delete_processed(
        app_state: &AppState,
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = "delete from outbox where idempotent_key = ANY($1)";

        let mut ids_to_delete = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
        ids_to_delete.push(Uuid::now_v7());

        sqlx::query(sql)
            .bind(ids_to_delete)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to delete processed outboxes"))?;

        Self::unlock(app_state, transaction, outboxes).await?;

        Ok(())
    }

    #[instrument(skip_all)]
    pub async fn increase_attempts(
        app_state: &AppState,
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = "update outbox set attempts = attempts + 1 where idempotent_key = ANY($1)";

        sqlx::query(sql)
            .bind(outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>())
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to increase attempts"))?;

        Self::unlock(app_state, transaction, outboxes).await?;

        Ok(())
    }

    #[instrument(skip_all)]
    async fn unlock(
        app_state: &AppState,
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        Self::unlock_by_partition_key(app_state, transaction, outboxes).await?;
        Self::unlock_expired_processes(app_state, transaction, outboxes).await?;

        Ok(())
    }

    #[instrument(skip_all)]
    async fn unlock_expired_processes(
        app_state: &AppState,
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql_unlock_by_processing_until = if app_state.scheduled_clear_locked_partition.unwrap_or(false) {
            "update outbox_lock set processed_at = now() where processing_until < now()"
        } else {
            "delete from outbox_lock where processing_until < now()"
        };

        let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
        ids_to_delete.push(Uuid::now_v7());

        sqlx::query(sql_unlock_by_processing_until)
            .bind(ids_to_delete)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;
        Ok(())
    }

    #[instrument(skip_all)]
    async fn unlock_by_partition_key(
        app_state: &AppState,
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql_unlock_by_partition_key = if app_state.scheduled_clear_locked_partition.unwrap_or(false) {
            "update outbox_lock set processed_at = now() where partition_key = ANY($1) and processed_at is null"
        } else {
            "delete from outbox_lock where partition_key = ANY($1) and processed_at is null"
        };

        let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
        ids_to_delete.push(Uuid::now_v7());

        sqlx::query(sql_unlock_by_partition_key)
            .bind(ids_to_delete)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;
        Ok(())
    }

    #[instrument(skip_all)]
    pub async fn clear_processed_locked_partition_key(transaction: &mut Transaction<'_, Postgres>) -> Result<(), OutboxPatternProcessorError> {
        let sql = "delete from outbox_lock where processed_at is not null and processed_at < now()";

        sqlx::query(sql)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to clear processed locks"))?;

        Ok(())
    }

    #[instrument(skip_all)]
    pub async fn find_cleaner_schedule(transaction: &mut Transaction<'_, Postgres>) -> Result<Option<OutboxCleanerSchedule>, OutboxPatternProcessorError> {
        let sql = "select * from outbox_cleaner_schedule limit 1 for update skip locked";

        sqlx::query_as(sql)
            .fetch_optional(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to update last cleaner execution"))
    }

    #[instrument(skip_all)]
    pub async fn update_last_cleaner_execution(transaction: &mut Transaction<'_, Postgres>) -> Result<(), OutboxPatternProcessorError> {
        let sql = "update outbox_cleaner_schedule set last_execution = now()";

        sqlx::query(sql)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to update last cleaner execution"))?;

        Ok(())
    }
}