outbox_pattern_processor/
outbox_private_repository.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use crate::app_state::AppState;
use crate::error::OutboxPatternProcessorError;
use crate::outbox::Outbox;
use crate::outbox_repository::OutboxRepository;
use sqlx::{Postgres, Transaction};
use uuid::Uuid;

impl OutboxRepository {
    pub async fn list(app_state: &AppState) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
        let lock_id = Uuid::now_v7();

        let processing_until_incremente_interval = format!("{} seconds", app_state.max_in_flight_interval_in_seconds.unwrap_or(30));

        let sql_lock = r#"
        insert into outbox_lock
        (
            select o.partition_key, $1 as lock_id, now() + ($3)::interval as processing_until
            from outbox o
            left join outbox_lock ol on o.partition_key = ol.partition_key
            where ol.partition_key is null and o.processed_at is null and o.process_after < now() and o.attempts < $4
            group by o.partition_key
            order by min(o.process_after)
            limit $2
        )
        ON CONFLICT DO NOTHING
        "#;

        sqlx::query(sql_lock)
            .bind(lock_id)
            .bind(app_state.outbox_query_limit.unwrap_or(50) as i32)
            .bind(processing_until_incremente_interval)
            .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
            .execute(&app_state.postgres_pool)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to lock outboxes"))?;

        let sql_list = r#"
        with locked as (
            select
                o.idempotent_key,
                row_number() over (partition by o.partition_key order by o.process_after asc) as rnk
            from outbox o
            inner join outbox_lock ol on o.partition_key = ol.partition_key and o.process_after < now() and o.processed_at is null and o.attempts < $2
            where ol.lock_id = $1
        )
        select o.*
        from outbox o
        inner join locked l on o.idempotent_key = l.idempotent_key and l.rnk = 1
        "#;

        sqlx::query_as(sql_list)
            .bind(lock_id)
            .bind(app_state.outbox_failure_limit.unwrap_or(10) as i32)
            .fetch_all(&app_state.postgres_pool)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to list outboxes"))
    }

    pub async fn mark_as_processed(
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = r#"
        update outbox
        set processed_at = now(),
            attempts = attempts + 1
        where idempotent_key = ANY($1)
        "#;

        let mut ids_to_update = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
        ids_to_update.push(Uuid::now_v7());

        sqlx::query(sql)
            .bind(ids_to_update)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to mark outboxes as processed"))?;

        Self::unlock_partition_key(transaction, outboxes).await?;

        Ok(())
    }

    pub async fn delete_processed(
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = r#"
        delete from outbox
        where idempotent_key = ANY($1)
        "#;

        let mut ids_to_delete = outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>();
        ids_to_delete.push(Uuid::now_v7());

        sqlx::query(sql)
            .bind(ids_to_delete)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to delete processed outboxes"))?;

        Self::unlock_partition_key(transaction, outboxes).await?;

        Ok(())
    }

    pub async fn increase_attempts(
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = r#"
        update outbox
        set attempts = attempts + 1
        where idempotent_key = ANY($1)
        "#;

        sqlx::query(sql)
            .bind(outboxes.iter().map(|it| it.idempotent_key).collect::<Vec<Uuid>>())
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to increase attempts"))?;

        Self::unlock_partition_key(transaction, outboxes).await?;

        Ok(())
    }

    async fn unlock_partition_key(
        transaction: &mut Transaction<'_, Postgres>,
        outboxes: &[Outbox],
    ) -> Result<(), OutboxPatternProcessorError> {
        let sql = r#"
        delete from outbox_lock
        where partition_key = ANY($1) or processing_until < now()
        "#;

        let mut ids_to_delete = outboxes.iter().map(|it| it.partition_key).collect::<Vec<Uuid>>();
        ids_to_delete.push(Uuid::now_v7());

        sqlx::query(sql)
            .bind(ids_to_delete)
            .execute(&mut **transaction)
            .await
            .map_err(|error| OutboxPatternProcessorError::new(&error.to_string(), "Failed to unlock partition keys"))?;

        Ok(())
    }
}