force_sync/store/pg/
dead_letter.rs1use serde_json::Value;
4use tokio_postgres::GenericClient;
5
6use crate::error::ForceSyncError;
7
8use super::PgStore;
9
10#[allow(clippy::derive_partial_eq_without_eq)]
12#[derive(Debug, Clone, PartialEq)]
13pub struct DeadLetter {
14 pub task_id: Option<i64>,
16 pub tenant: Option<String>,
18 pub object_name: Option<String>,
20 pub external_id: Option<String>,
22 pub error_message: String,
24 pub payload: Option<Value>,
26}
27
28async fn insert_dead_letter_query<C>(
29 client: &C,
30 dead_letter: &DeadLetter,
31) -> Result<i64, ForceSyncError>
32where
33 C: GenericClient + Sync + ?Sized,
34{
35 let row = client
36 .query_one(
37 "insert into sync_dead_letter (
38 task_id,
39 tenant,
40 object_name,
41 external_id,
42 error_message,
43 payload
44 ) values ($1, $2, $3, $4, $5, $6::jsonb)
45 returning dead_letter_id",
46 &[
47 &dead_letter.task_id,
48 &dead_letter.tenant,
49 &dead_letter.object_name,
50 &dead_letter.external_id,
51 &dead_letter.error_message,
52 &dead_letter.payload,
53 ],
54 )
55 .await?;
56
57 Ok(row.get(0))
58}
59
60pub async fn insert_dead_letter_in_tx<C>(
66 client: &C,
67 dead_letter: &DeadLetter,
68) -> Result<i64, ForceSyncError>
69where
70 C: GenericClient + Sync + ?Sized,
71{
72 insert_dead_letter_query(client, dead_letter).await
73}
74
75impl PgStore {
76 pub async fn insert_dead_letter(
82 &self,
83 dead_letter: &DeadLetter,
84 ) -> Result<i64, ForceSyncError> {
85 let client = self.pool().get().await?;
86 insert_dead_letter_query(&**client, dead_letter).await
87 }
88}