Skip to main content

force_sync/store/pg/
link.rs

1//! Link repository helpers for the `PostgreSQL` sync store.
2
3use tokio_postgres::GenericClient;
4
5use crate::error::ForceSyncError;
6
7use super::PgStore;
8
9/// Canonical sync link row.
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct SyncLink {
12    /// Tenant identifier.
13    pub tenant: String,
14    /// Salesforce object name.
15    pub object_name: String,
16    /// Canonical external ID.
17    pub external_id: String,
18    /// Salesforce ID alias, if known.
19    pub salesforce_id: Option<String>,
20    /// Postgres ID alias, if known.
21    pub postgres_id: Option<String>,
22    /// Source that last wrote the link.
23    pub last_source: Option<String>,
24    /// Source cursor for the last write.
25    pub last_source_cursor: Option<String>,
26    /// Payload hash for the last write.
27    pub last_payload_hash: Option<Vec<u8>>,
28    /// Whether the link represents a tombstone.
29    pub tombstone: bool,
30}
31
32fn link_from_row(row: &tokio_postgres::Row) -> SyncLink {
33    SyncLink {
34        tenant: row.get(0),
35        object_name: row.get(1),
36        external_id: row.get(2),
37        salesforce_id: row.get(3),
38        postgres_id: row.get(4),
39        last_source: row.get(5),
40        last_source_cursor: row.get(6),
41        last_payload_hash: row.get(7),
42        tombstone: row.get(8),
43    }
44}
45
46async fn put_link_query<C>(client: &C, link: &SyncLink) -> Result<i64, ForceSyncError>
47where
48    C: GenericClient + Sync + ?Sized,
49{
50    let payload_hash = link.last_payload_hash.as_deref();
51    let row = client
52        .query_one(
53            "insert into sync_link (
54                tenant,
55                object_name,
56                external_id,
57                salesforce_id,
58                postgres_id,
59                last_source,
60                last_source_cursor,
61                last_payload_hash,
62                tombstone,
63                updated_at
64            ) values (
65                $1,
66                $2,
67                $3,
68                $4,
69                $5,
70                $6,
71                $7,
72                $8,
73                $9,
74                now()
75            ) on conflict (tenant, object_name, external_id) do update set
76                salesforce_id = excluded.salesforce_id,
77                postgres_id = excluded.postgres_id,
78                last_source = excluded.last_source,
79                last_source_cursor = excluded.last_source_cursor,
80                last_payload_hash = excluded.last_payload_hash,
81                tombstone = excluded.tombstone,
82                updated_at = now()
83            returning link_id",
84            &[
85                &link.tenant,
86                &link.object_name,
87                &link.external_id,
88                &link.salesforce_id,
89                &link.postgres_id,
90                &link.last_source,
91                &link.last_source_cursor,
92                &payload_hash,
93                &link.tombstone,
94            ],
95        )
96        .await?;
97
98    Ok(row.get(0))
99}
100
101async fn get_link_query<C>(
102    client: &C,
103    tenant: &str,
104    object_name: &str,
105    external_id: &str,
106) -> Result<Option<SyncLink>, ForceSyncError>
107where
108    C: GenericClient + Sync + ?Sized,
109{
110    let row = client
111        .query_opt(
112            "select tenant, object_name, external_id, salesforce_id, postgres_id,
113                    last_source, last_source_cursor, last_payload_hash, tombstone
114             from sync_link
115             where tenant = $1 and object_name = $2 and external_id = $3",
116            &[&tenant, &object_name, &external_id],
117        )
118        .await?;
119
120    Ok(row.map(|row| link_from_row(&row)))
121}
122
123impl PgStore {
124    /// Upserts a link row and returns the database identifier.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the database write fails.
129    pub async fn put_link(&self, link: &SyncLink) -> Result<i64, ForceSyncError> {
130        let client = self.pool().get().await?;
131        put_link_query(&**client, link).await
132    }
133
134    /// Loads a link row by canonical identity.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the database query fails.
139    pub async fn get_link(
140        &self,
141        tenant: &str,
142        object_name: &str,
143        external_id: &str,
144    ) -> Result<Option<SyncLink>, ForceSyncError> {
145        let client = self.pool().get().await?;
146        get_link_query(&**client, tenant, object_name, external_id).await
147    }
148}