Skip to main content

force_sync/store/pg/
dead_letter.rs

1//! Dead-letter repository helpers for the `PostgreSQL` sync store.
2
3use serde_json::Value;
4use tokio_postgres::GenericClient;
5
6use crate::error::ForceSyncError;
7
8use super::PgStore;
9
10/// Dead-letter row captured for operator review.
11#[allow(clippy::derive_partial_eq_without_eq)]
12#[derive(Debug, Clone, PartialEq)]
13pub struct DeadLetter {
14    /// Optional task identifier.
15    pub task_id: Option<i64>,
16    /// Optional tenant identifier.
17    pub tenant: Option<String>,
18    /// Optional Salesforce object name.
19    pub object_name: Option<String>,
20    /// Optional canonical external ID.
21    pub external_id: Option<String>,
22    /// Terminal error message.
23    pub error_message: String,
24    /// Optional payload captured at failure time.
25    pub payload: Option<Value>,
26}
27
28async fn insert_dead_letter_query<C>(
29    client: &C,
30    dead_letter: &DeadLetter,
31) -> Result<i64, ForceSyncError>
32where
33    C: GenericClient + Sync + ?Sized,
34{
35    let row = client
36        .query_one(
37            "insert into sync_dead_letter (
38                task_id,
39                tenant,
40                object_name,
41                external_id,
42                error_message,
43                payload
44            ) values ($1, $2, $3, $4, $5, $6::jsonb)
45            returning dead_letter_id",
46            &[
47                &dead_letter.task_id,
48                &dead_letter.tenant,
49                &dead_letter.object_name,
50                &dead_letter.external_id,
51                &dead_letter.error_message,
52                &dead_letter.payload,
53            ],
54        )
55        .await?;
56
57    Ok(row.get(0))
58}
59
60/// Inserts a dead-letter row inside an existing transaction or client session.
61///
62/// # Errors
63///
64/// Returns an error if the database write fails.
65pub async fn insert_dead_letter_in_tx<C>(
66    client: &C,
67    dead_letter: &DeadLetter,
68) -> Result<i64, ForceSyncError>
69where
70    C: GenericClient + Sync + ?Sized,
71{
72    insert_dead_letter_query(client, dead_letter).await
73}
74
75impl PgStore {
76    /// Inserts a dead-letter row and returns its database identifier.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the database write fails.
81    pub async fn insert_dead_letter(
82        &self,
83        dead_letter: &DeadLetter,
84    ) -> Result<i64, ForceSyncError> {
85        let client = self.pool().get().await?;
86        insert_dead_letter_query(&**client, dead_letter).await
87    }
88}