outbox_pattern_processor/
outbox_repository.rs

1use crate::error::OutboxPatternProcessorError;
2use crate::outbox::Outbox;
3use sqlx::PgConnection;
4use tracing::instrument;
5
6pub struct OutboxRepository;
7
8impl OutboxRepository {
9    #[instrument(skip_all)]
10    pub async fn insert(
11        db_conn: &mut PgConnection,
12        outbox: Outbox,
13    ) -> Result<Outbox, OutboxPatternProcessorError> {
14        let outboxes = Self::insert_all(db_conn, vec![outbox]).await?;
15        Ok(outboxes[0].clone())
16    }
17
18    pub async fn insert_all(
19        db_conn: &mut PgConnection,
20        outboxes: Vec<Outbox>,
21    ) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
22        if outboxes.is_empty() {
23            return Ok(vec![]);
24        }
25
26        let sql = r#"
27            INSERT INTO outbox
28                (idempotent_key, partition_key, destinations, headers, payload, created_at, process_after)
29        "#;
30
31        let mut query_builder = sqlx::QueryBuilder::new(sql);
32
33        query_builder.push_values(outboxes.clone(), |mut b, outbox: Outbox| {
34            b.push_bind(outbox.idempotent_key)
35                .push_bind(outbox.partition_key)
36                .push_bind(outbox.destinations)
37                .push_bind(outbox.headers)
38                .push_bind(outbox.payload)
39                .push_bind(outbox.created_at)
40                .push_bind(outbox.process_after.unwrap_or(outbox.created_at));
41        });
42
43        query_builder.build().execute(&mut *db_conn).await.map_err(|error| {
44            OutboxPatternProcessorError::new(
45                &error.to_string(),
46                &format!(
47                    "Failed to insert outboxes to partition_keys={}",
48                    outboxes.iter().map(|it| it.partition_key.to_string()).collect::<Vec<_>>().join(", ")
49                ),
50            )
51        })?;
52
53        Ok(outboxes)
54    }
55}