outbox_pattern_processor/
outbox_repository.rs1use crate::error::OutboxPatternProcessorError;
2use crate::outbox::Outbox;
3use sqlx::PgConnection;
4use tracing::instrument;
5
6pub struct OutboxRepository;
7
8impl OutboxRepository {
9 #[instrument(skip_all)]
10 pub async fn insert(
11 db_conn: &mut PgConnection,
12 outbox: Outbox,
13 ) -> Result<Outbox, OutboxPatternProcessorError> {
14 let outboxes = Self::insert_all(db_conn, vec![outbox]).await?;
15 Ok(outboxes[0].clone())
16 }
17
18 pub async fn insert_all(
19 db_conn: &mut PgConnection,
20 outboxes: Vec<Outbox>,
21 ) -> Result<Vec<Outbox>, OutboxPatternProcessorError> {
22 if outboxes.is_empty() {
23 return Ok(vec![]);
24 }
25
26 let sql = r#"
27 INSERT INTO outbox
28 (idempotent_key, partition_key, destinations, headers, payload, created_at, process_after)
29 "#;
30
31 let mut query_builder = sqlx::QueryBuilder::new(sql);
32
33 query_builder.push_values(outboxes.clone(), |mut b, outbox: Outbox| {
34 b.push_bind(outbox.idempotent_key)
35 .push_bind(outbox.partition_key)
36 .push_bind(outbox.destinations)
37 .push_bind(outbox.headers)
38 .push_bind(outbox.payload)
39 .push_bind(outbox.created_at)
40 .push_bind(outbox.process_after.unwrap_or(outbox.created_at));
41 });
42
43 query_builder.build().execute(&mut *db_conn).await.map_err(|error| {
44 OutboxPatternProcessorError::new(
45 &error.to_string(),
46 &format!(
47 "Failed to insert outboxes to partition_keys={}",
48 outboxes.iter().map(|it| it.partition_key.to_string()).collect::<Vec<_>>().join(", ")
49 ),
50 )
51 })?;
52
53 Ok(outboxes)
54 }
55}