1use tokio_postgres::GenericClient;
4
5use crate::error::ForceSyncError;
6
7use super::PgStore;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct SyncLink {
12 pub tenant: String,
14 pub object_name: String,
16 pub external_id: String,
18 pub salesforce_id: Option<String>,
20 pub postgres_id: Option<String>,
22 pub last_source: Option<String>,
24 pub last_source_cursor: Option<String>,
26 pub last_payload_hash: Option<Vec<u8>>,
28 pub tombstone: bool,
30}
31
32fn link_from_row(row: &tokio_postgres::Row) -> SyncLink {
33 SyncLink {
34 tenant: row.get(0),
35 object_name: row.get(1),
36 external_id: row.get(2),
37 salesforce_id: row.get(3),
38 postgres_id: row.get(4),
39 last_source: row.get(5),
40 last_source_cursor: row.get(6),
41 last_payload_hash: row.get(7),
42 tombstone: row.get(8),
43 }
44}
45
46async fn put_link_query<C>(client: &C, link: &SyncLink) -> Result<i64, ForceSyncError>
47where
48 C: GenericClient + Sync + ?Sized,
49{
50 let payload_hash = link.last_payload_hash.as_deref();
51 let row = client
52 .query_one(
53 "insert into sync_link (
54 tenant,
55 object_name,
56 external_id,
57 salesforce_id,
58 postgres_id,
59 last_source,
60 last_source_cursor,
61 last_payload_hash,
62 tombstone,
63 updated_at
64 ) values (
65 $1,
66 $2,
67 $3,
68 $4,
69 $5,
70 $6,
71 $7,
72 $8,
73 $9,
74 now()
75 ) on conflict (tenant, object_name, external_id) do update set
76 salesforce_id = excluded.salesforce_id,
77 postgres_id = excluded.postgres_id,
78 last_source = excluded.last_source,
79 last_source_cursor = excluded.last_source_cursor,
80 last_payload_hash = excluded.last_payload_hash,
81 tombstone = excluded.tombstone,
82 updated_at = now()
83 returning link_id",
84 &[
85 &link.tenant,
86 &link.object_name,
87 &link.external_id,
88 &link.salesforce_id,
89 &link.postgres_id,
90 &link.last_source,
91 &link.last_source_cursor,
92 &payload_hash,
93 &link.tombstone,
94 ],
95 )
96 .await?;
97
98 Ok(row.get(0))
99}
100
101async fn get_link_query<C>(
102 client: &C,
103 tenant: &str,
104 object_name: &str,
105 external_id: &str,
106) -> Result<Option<SyncLink>, ForceSyncError>
107where
108 C: GenericClient + Sync + ?Sized,
109{
110 let row = client
111 .query_opt(
112 "select tenant, object_name, external_id, salesforce_id, postgres_id,
113 last_source, last_source_cursor, last_payload_hash, tombstone
114 from sync_link
115 where tenant = $1 and object_name = $2 and external_id = $3",
116 &[&tenant, &object_name, &external_id],
117 )
118 .await?;
119
120 Ok(row.map(|row| link_from_row(&row)))
121}
122
123impl PgStore {
124 pub async fn put_link(&self, link: &SyncLink) -> Result<i64, ForceSyncError> {
130 let client = self.pool().get().await?;
131 put_link_query(&**client, link).await
132 }
133
134 pub async fn get_link(
140 &self,
141 tenant: &str,
142 object_name: &str,
143 external_id: &str,
144 ) -> Result<Option<SyncLink>, ForceSyncError> {
145 let client = self.pool().get().await?;
146 get_link_query(&**client, tenant, object_name, external_id).await
147 }
148}