1use chrono::{DateTime, Utc};
4use serde_json::Value;
5use tokio_postgres::GenericClient;
6
7use crate::{
8 error::ForceSyncError,
9 model::{ChangeEnvelope, ChangeOperation},
10};
11
12use super::PgStore;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum AppendResult {
17 Inserted {
19 journal_id: i64,
21 },
22 Duplicate,
24}
25
26struct JournalValues {
27 tenant: String,
28 object_name: String,
29 external_id: String,
30 source: &'static str,
31 source_cursor: String,
32 observed_at: DateTime<Utc>,
33 operation: &'static str,
34 tombstone: bool,
35 payload: Value,
36 payload_hash: [u8; 32],
37}
38
39fn journal_values(envelope: &ChangeEnvelope) -> Result<JournalValues, ForceSyncError> {
40 let cursor = envelope
41 .cursor()
42 .ok_or(ForceSyncError::MissingSourceCursor)?;
43
44 Ok(JournalValues {
45 tenant: envelope.sync_key().tenant().to_owned(),
46 object_name: envelope.sync_key().object_name().to_owned(),
47 external_id: envelope.sync_key().external_id().to_owned(),
48 source: envelope.source().as_db_value(),
49 source_cursor: cursor.as_db_value(),
50 observed_at: envelope.observed_at(),
51 operation: envelope.operation().as_db_value(),
52 tombstone: matches!(envelope.operation(), ChangeOperation::Delete),
53 payload: envelope.payload().clone(),
54 payload_hash: envelope.payload_hash(),
55 })
56}
57
58async fn insert_journal<C>(client: &C, values: &JournalValues) -> Result<i64, ForceSyncError>
59where
60 C: GenericClient + Sync + ?Sized,
61{
62 let payload_hash = values.payload_hash.as_slice();
63 let row = client
64 .query_one(
65 "insert into sync_journal (
66 tenant,
67 object_name,
68 external_id,
69 source,
70 source_cursor,
71 observed_at,
72 operation,
73 tombstone,
74 payload,
75 payload_hash,
76 schema_version
77 ) values (
78 $1,
79 $2,
80 $3,
81 $4,
82 $5,
83 $6::timestamptz,
84 $7,
85 $8,
86 $9::jsonb,
87 $10,
88 1
89 ) returning journal_id",
90 &[
91 &values.tenant,
92 &values.object_name,
93 &values.external_id,
94 &values.source,
95 &values.source_cursor,
96 &values.observed_at,
97 &values.operation,
98 &values.tombstone,
99 &values.payload,
100 &payload_hash,
101 ],
102 )
103 .await?;
104
105 Ok(row.get(0))
106}
107
108async fn insert_journal_if_new<C>(
109 client: &C,
110 values: &JournalValues,
111) -> Result<Option<i64>, ForceSyncError>
112where
113 C: GenericClient + Sync + ?Sized,
114{
115 let payload_hash = values.payload_hash.as_slice();
116 let row = client
117 .query_opt(
118 "insert into sync_journal (
119 tenant,
120 object_name,
121 external_id,
122 source,
123 source_cursor,
124 observed_at,
125 operation,
126 tombstone,
127 payload,
128 payload_hash,
129 schema_version
130 ) values (
131 $1,
132 $2,
133 $3,
134 $4,
135 $5,
136 $6::timestamptz,
137 $7,
138 $8,
139 $9::jsonb,
140 $10,
141 1
142 ) on conflict (source, source_cursor) do nothing
143 returning journal_id",
144 &[
145 &values.tenant,
146 &values.object_name,
147 &values.external_id,
148 &values.source,
149 &values.source_cursor,
150 &values.observed_at,
151 &values.operation,
152 &values.tombstone,
153 &values.payload,
154 &payload_hash,
155 ],
156 )
157 .await?;
158
159 Ok(row.map(|row| row.get(0)))
160}
161
162impl PgStore {
163 pub async fn append_journal(&self, envelope: &ChangeEnvelope) -> Result<i64, ForceSyncError> {
169 let values = journal_values(envelope)?;
170 let client = self.pool().get().await?;
171 insert_journal(&**client, &values).await
172 }
173
174 pub async fn append_journal_if_new(
180 &self,
181 envelope: &ChangeEnvelope,
182 ) -> Result<AppendResult, ForceSyncError> {
183 let values = journal_values(envelope)?;
184 let client = self.pool().get().await?;
185 insert_journal_if_new(&**client, &values)
186 .await?
187 .map_or_else(
188 || Ok(AppendResult::Duplicate),
189 |journal_id| Ok(AppendResult::Inserted { journal_id }),
190 )
191 }
192
193 pub async fn append_journal_in_tx<C>(
202 client: &C,
203 envelope: &ChangeEnvelope,
204 ) -> Result<i64, ForceSyncError>
205 where
206 C: GenericClient + Sync + ?Sized,
207 {
208 let values = journal_values(envelope)?;
209 insert_journal(client, &values).await
210 }
211
212 pub async fn append_journal_if_new_in_tx<C>(
218 client: &C,
219 envelope: &ChangeEnvelope,
220 ) -> Result<AppendResult, ForceSyncError>
221 where
222 C: GenericClient + Sync + ?Sized,
223 {
224 let values = journal_values(envelope)?;
225
226 insert_journal_if_new(client, &values).await?.map_or_else(
227 || Ok(AppendResult::Duplicate),
228 |journal_id| Ok(AppendResult::Inserted { journal_id }),
229 )
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use chrono::Utc;
236 use serde_json::json;
237
238 use crate::{
239 identity::SyncKey,
240 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
241 };
242
243 use super::journal_values;
244
245 #[test]
246 fn journal_values_keep_observed_at_as_datetime() {
247 let observed_at = Utc::now();
248 let envelope = ChangeEnvelope::new(
249 SyncKey::new("tenant", "Account", "external-1")
250 .unwrap_or_else(|error| panic!("unexpected sync key construction error: {error}")),
251 SourceSystem::Postgres,
252 ChangeOperation::Upsert,
253 observed_at,
254 json!({"Name": "Acme"}),
255 )
256 .with_cursor(SourceCursor::PostgresLsn("lsn-1".to_owned()));
257
258 let values = journal_values(&envelope)
259 .unwrap_or_else(|error| panic!("unexpected journal values error: {error}"));
260
261 let _: chrono::DateTime<chrono::Utc> = values.observed_at;
262 let _: &serde_json::Value = &values.payload;
263 assert_eq!(values.observed_at, observed_at);
264 assert_eq!(values.payload, json!({"Name": "Acme"}));
265 }
266}