Skip to main content

force_sync/store/pg/
journal.rs

1//! Journal write helpers for the `PostgreSQL` sync store.
2
3use chrono::{DateTime, Utc};
4use serde_json::Value;
5use tokio_postgres::GenericClient;
6
7use crate::{
8    error::ForceSyncError,
9    model::{ChangeEnvelope, ChangeOperation},
10};
11
12use super::PgStore;
13
14/// Result of attempting to append a journal entry when duplicates are allowed.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum AppendResult {
17    /// The row was inserted and the returned ID is the new journal row.
18    Inserted {
19        /// The inserted journal row ID.
20        journal_id: i64,
21    },
22    /// A row with the same `(source, source_cursor)` already exists.
23    Duplicate,
24}
25
26struct JournalValues {
27    tenant: String,
28    object_name: String,
29    external_id: String,
30    source: &'static str,
31    source_cursor: String,
32    observed_at: DateTime<Utc>,
33    operation: &'static str,
34    tombstone: bool,
35    payload: Value,
36    payload_hash: [u8; 32],
37}
38
39fn journal_values(envelope: &ChangeEnvelope) -> Result<JournalValues, ForceSyncError> {
40    let cursor = envelope
41        .cursor()
42        .ok_or(ForceSyncError::MissingSourceCursor)?;
43
44    Ok(JournalValues {
45        tenant: envelope.sync_key().tenant().to_owned(),
46        object_name: envelope.sync_key().object_name().to_owned(),
47        external_id: envelope.sync_key().external_id().to_owned(),
48        source: envelope.source().as_db_value(),
49        source_cursor: cursor.as_db_value(),
50        observed_at: envelope.observed_at(),
51        operation: envelope.operation().as_db_value(),
52        tombstone: matches!(envelope.operation(), ChangeOperation::Delete),
53        payload: envelope.payload().clone(),
54        payload_hash: envelope.payload_hash(),
55    })
56}
57
58async fn insert_journal<C>(client: &C, values: &JournalValues) -> Result<i64, ForceSyncError>
59where
60    C: GenericClient + Sync + ?Sized,
61{
62    let payload_hash = values.payload_hash.as_slice();
63    let row = client
64        .query_one(
65            "insert into sync_journal (
66                tenant,
67                object_name,
68                external_id,
69                source,
70                source_cursor,
71                observed_at,
72                operation,
73                tombstone,
74                payload,
75                payload_hash,
76                schema_version
77            ) values (
78                $1,
79                $2,
80                $3,
81                $4,
82                $5,
83                $6::timestamptz,
84                $7,
85                $8,
86                $9::jsonb,
87                $10,
88                1
89            ) returning journal_id",
90            &[
91                &values.tenant,
92                &values.object_name,
93                &values.external_id,
94                &values.source,
95                &values.source_cursor,
96                &values.observed_at,
97                &values.operation,
98                &values.tombstone,
99                &values.payload,
100                &payload_hash,
101            ],
102        )
103        .await?;
104
105    Ok(row.get(0))
106}
107
108async fn insert_journal_if_new<C>(
109    client: &C,
110    values: &JournalValues,
111) -> Result<Option<i64>, ForceSyncError>
112where
113    C: GenericClient + Sync + ?Sized,
114{
115    let payload_hash = values.payload_hash.as_slice();
116    let row = client
117        .query_opt(
118            "insert into sync_journal (
119                tenant,
120                object_name,
121                external_id,
122                source,
123                source_cursor,
124                observed_at,
125                operation,
126                tombstone,
127                payload,
128                payload_hash,
129                schema_version
130            ) values (
131                $1,
132                $2,
133                $3,
134                $4,
135                $5,
136                $6::timestamptz,
137                $7,
138                $8,
139                $9::jsonb,
140                $10,
141                1
142            ) on conflict (source, source_cursor) do nothing
143            returning journal_id",
144            &[
145                &values.tenant,
146                &values.object_name,
147                &values.external_id,
148                &values.source,
149                &values.source_cursor,
150                &values.observed_at,
151                &values.operation,
152                &values.tombstone,
153                &values.payload,
154                &payload_hash,
155            ],
156        )
157        .await?;
158
159    Ok(row.map(|row| row.get(0)))
160}
161
162impl PgStore {
163    /// Appends a journal entry and returns its database identifier.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the cursor is missing or the database write fails.
168    pub async fn append_journal(&self, envelope: &ChangeEnvelope) -> Result<i64, ForceSyncError> {
169        let values = journal_values(envelope)?;
170        let client = self.pool().get().await?;
171        insert_journal(&**client, &values).await
172    }
173
174    /// Appends a journal entry if the `(source, source_cursor)` pair is new.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if the cursor is missing or the database write fails.
179    pub async fn append_journal_if_new(
180        &self,
181        envelope: &ChangeEnvelope,
182    ) -> Result<AppendResult, ForceSyncError> {
183        let values = journal_values(envelope)?;
184        let client = self.pool().get().await?;
185        insert_journal_if_new(&**client, &values)
186            .await?
187            .map_or_else(
188                || Ok(AppendResult::Duplicate),
189                |journal_id| Ok(AppendResult::Inserted { journal_id }),
190            )
191    }
192
193    /// Appends a journal row in an existing transaction.
194    ///
195    /// This helper exists so same-transaction workflows can remain atomic
196    /// without forcing the caller through another store method.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the cursor is missing or the database write fails.
201    pub async fn append_journal_in_tx<C>(
202        client: &C,
203        envelope: &ChangeEnvelope,
204    ) -> Result<i64, ForceSyncError>
205    where
206        C: GenericClient + Sync + ?Sized,
207    {
208        let values = journal_values(envelope)?;
209        insert_journal(client, &values).await
210    }
211
212    /// Appends a journal row in an existing transaction, skipping duplicates.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if the cursor is missing or the database write fails.
217    pub async fn append_journal_if_new_in_tx<C>(
218        client: &C,
219        envelope: &ChangeEnvelope,
220    ) -> Result<AppendResult, ForceSyncError>
221    where
222        C: GenericClient + Sync + ?Sized,
223    {
224        let values = journal_values(envelope)?;
225
226        insert_journal_if_new(client, &values).await?.map_or_else(
227            || Ok(AppendResult::Duplicate),
228            |journal_id| Ok(AppendResult::Inserted { journal_id }),
229        )
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use chrono::Utc;
236    use serde_json::json;
237
238    use crate::{
239        identity::SyncKey,
240        model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
241    };
242
243    use super::journal_values;
244
245    #[test]
246    fn journal_values_keep_observed_at_as_datetime() {
247        let observed_at = Utc::now();
248        let envelope = ChangeEnvelope::new(
249            SyncKey::new("tenant", "Account", "external-1")
250                .unwrap_or_else(|error| panic!("unexpected sync key construction error: {error}")),
251            SourceSystem::Postgres,
252            ChangeOperation::Upsert,
253            observed_at,
254            json!({"Name": "Acme"}),
255        )
256        .with_cursor(SourceCursor::PostgresLsn("lsn-1".to_owned()));
257
258        let values = journal_values(&envelope)
259            .unwrap_or_else(|error| panic!("unexpected journal values error: {error}"));
260
261        let _: chrono::DateTime<chrono::Utc> = values.observed_at;
262        let _: &serde_json::Value = &values.payload;
263        assert_eq!(values.observed_at, observed_at);
264        assert_eq!(values.payload, json!({"Name": "Acme"}));
265    }
266}