Skip to main content

force_sync/capture/
postgres.rs

1//! `PostgreSQL` outbox capture for force-sync.
2
3use futures::FutureExt;
4use serde_json::Value;
5use tokio_postgres::GenericClient;
6
7use crate::{
8    error::ForceSyncError,
9    identity::SyncKey,
10    model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
11    store::pg::{AppendResult, DeadLetter, PgStore},
12};
13
14struct OutboxRow {
15    outbox_id: i64,
16    tenant: String,
17    object_name: String,
18    external_id: String,
19    source_cursor: String,
20    op: String,
21    tombstone: bool,
22    payload_text: String,
23    created_at: chrono::DateTime<chrono::Utc>,
24}
25
26fn outbox_operation(op: &str, tombstone: bool) -> Result<ChangeOperation, ForceSyncError> {
27    match (op, tombstone) {
28        ("upsert", false) => Ok(ChangeOperation::Upsert),
29        ("delete", true) => Ok(ChangeOperation::Delete),
30        ("upsert", true) | ("delete", false) => Err(ForceSyncError::InvalidOutboxOperation {
31            op: format!("{op}:{tombstone}"),
32        }),
33        (other_op, _) => Err(ForceSyncError::InvalidOutboxOperation {
34            op: other_op.to_string(),
35        }),
36    }
37}
38
39fn outbox_source_cursor(raw: &str) -> Result<SourceCursor, ForceSyncError> {
40    if raw.starts_with("postgres-lsn:")
41        || raw.starts_with("salesforce-replay-id:")
42        || raw.starts_with("snapshot:")
43    {
44        return Err(ForceSyncError::InvalidOutboxCursor {
45            cursor: raw.to_string(),
46        });
47    }
48
49    Ok(SourceCursor::PostgresLsn(raw.to_string()))
50}
51
52fn outbox_envelope(row: &OutboxRow) -> Result<ChangeEnvelope, ForceSyncError> {
53    let payload: Value = serde_json::from_str(&row.payload_text)?;
54    let sync_key = SyncKey::new(
55        row.tenant.clone(),
56        row.object_name.clone(),
57        row.external_id.clone(),
58    )?;
59    let operation = outbox_operation(&row.op, row.tombstone)?;
60
61    Ok(ChangeEnvelope::new(
62        sync_key,
63        SourceSystem::Postgres,
64        operation,
65        row.created_at,
66        payload,
67    )
68    .with_cursor(outbox_source_cursor(&row.source_cursor)?))
69}
70
71const fn row_content_error(error: &ForceSyncError) -> bool {
72    matches!(
73        error,
74        ForceSyncError::Json(_)
75            | ForceSyncError::InvalidOutboxOperation { .. }
76            | ForceSyncError::InvalidOutboxCursor { .. }
77            | ForceSyncError::EmptySyncKeyPart { .. }
78    )
79}
80
81async fn quarantine_row<C>(
82    client: &C,
83    row: &OutboxRow,
84    error: &ForceSyncError,
85) -> Result<(), ForceSyncError>
86where
87    C: GenericClient + Sync + ?Sized,
88{
89    let payload = serde_json::from_str::<Value>(&row.payload_text).ok();
90    let dead_letter = DeadLetter {
91        task_id: None,
92        tenant: Some(row.tenant.clone()),
93        object_name: Some(row.object_name.clone()),
94        external_id: Some(row.external_id.clone()),
95        error_message: error.to_string(),
96        payload,
97    };
98
99    crate::store::pg::dead_letter::insert_dead_letter_in_tx(client, &dead_letter).await?;
100    client
101        .execute(
102            "update force_sync_outbox
103             set processed_at = now()
104             where outbox_id = $1
105               and processed_at is null",
106            &[&row.outbox_id],
107        )
108        .await?;
109    Ok(())
110}
111
112async fn capture_batch_in_tx<C>(
113    client: &C,
114    limit: i64,
115    priority: i32,
116) -> Result<usize, ForceSyncError>
117where
118    C: GenericClient + Sync + ?Sized,
119{
120    if limit <= 0 {
121        return Ok(0);
122    }
123
124    let rows = client
125        .query(
126            "select outbox_id, tenant, object_name, external_id, source_cursor, op, tombstone, payload::text as payload_text, created_at
127             from force_sync_outbox
128             where processed_at is null
129             order by created_at asc, outbox_id asc
130             for update skip locked
131             limit $1",
132            &[&limit],
133        )
134        .await?;
135
136    let mut processed = 0usize;
137
138    for row in rows {
139        let outbox_row = OutboxRow {
140            outbox_id: row.get("outbox_id"),
141            tenant: row.get("tenant"),
142            object_name: row.get("object_name"),
143            external_id: row.get("external_id"),
144            source_cursor: row.get("source_cursor"),
145            op: row.get("op"),
146            tombstone: row.get("tombstone"),
147            payload_text: row.get("payload_text"),
148            created_at: row.get("created_at"),
149        };
150
151        match outbox_envelope(&outbox_row) {
152            Ok(envelope) => match PgStore::append_journal_if_new_in_tx(client, &envelope).await? {
153                AppendResult::Inserted { journal_id } => {
154                    PgStore::enqueue_apply_task_in_tx(client, journal_id, priority).await?;
155                    client
156                        .execute(
157                            "update force_sync_outbox
158                             set processed_at = now()
159                             where outbox_id = $1
160                               and processed_at is null",
161                            &[&outbox_row.outbox_id],
162                        )
163                        .await?;
164                    processed += 1;
165                }
166                AppendResult::Duplicate => {
167                    client
168                        .execute(
169                            "update force_sync_outbox
170                             set processed_at = now()
171                             where outbox_id = $1
172                               and processed_at is null",
173                            &[&outbox_row.outbox_id],
174                        )
175                        .await?;
176                    processed += 1;
177                }
178            },
179            Err(error) if row_content_error(&error) => {
180                quarantine_row(client, &outbox_row, &error).await?;
181                processed += 1;
182            }
183            Err(error) => return Err(error),
184        }
185    }
186
187    Ok(processed)
188}
189
190/// Captures a batch of unprocessed `PostgreSQL` outbox rows into the sync journal.
191///
192/// # Errors
193///
194/// Returns a database error if the transaction cannot be opened or a row
195/// cannot be converted into a sync envelope.
196pub async fn capture_batch(
197    store: &PgStore,
198    limit: i64,
199    priority: i32,
200) -> Result<usize, ForceSyncError> {
201    store
202        .with_transaction(|tx| {
203            async move { capture_batch_in_tx(tx, limit, priority).await }.boxed()
204        })
205        .await
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::model::{ChangeOperation, SourceCursor};
212
213    // ── outbox_operation ──────────────────────────────────────────────
214
215    #[test]
216    fn outbox_operation_upsert_not_tombstone() {
217        let Ok(op) = outbox_operation("upsert", false) else {
218            panic!("expected Ok for upsert/false");
219        };
220        assert_eq!(op, ChangeOperation::Upsert);
221    }
222
223    #[test]
224    fn outbox_operation_delete_tombstone() {
225        let Ok(op) = outbox_operation("delete", true) else {
226            panic!("expected Ok for delete/true");
227        };
228        assert_eq!(op, ChangeOperation::Delete);
229    }
230
231    #[test]
232    fn outbox_operation_upsert_tombstone_is_invalid() {
233        assert!(matches!(
234            outbox_operation("upsert", true),
235            Err(ForceSyncError::InvalidOutboxOperation { .. })
236        ));
237    }
238
239    #[test]
240    fn outbox_operation_delete_not_tombstone_is_invalid() {
241        assert!(matches!(
242            outbox_operation("delete", false),
243            Err(ForceSyncError::InvalidOutboxOperation { .. })
244        ));
245    }
246
247    #[test]
248    fn outbox_operation_unknown_op_is_invalid() {
249        assert!(matches!(
250            outbox_operation("insert", false),
251            Err(ForceSyncError::InvalidOutboxOperation { .. })
252        ));
253    }
254
255    // ── outbox_source_cursor ──────────────────────────────────────────
256
257    #[test]
258    fn outbox_source_cursor_plain_lsn_succeeds() {
259        let Ok(cursor) = outbox_source_cursor("0/16B3740") else {
260            panic!("expected Ok for plain LSN");
261        };
262        assert!(matches!(cursor, SourceCursor::PostgresLsn(ref lsn) if lsn == "0/16B3740"));
263    }
264
265    #[test]
266    fn outbox_source_cursor_rejects_postgres_lsn_prefix() {
267        assert!(matches!(
268            outbox_source_cursor("postgres-lsn:0/16B3740"),
269            Err(ForceSyncError::InvalidOutboxCursor { .. })
270        ));
271    }
272
273    #[test]
274    fn outbox_source_cursor_rejects_salesforce_replay_id_prefix() {
275        assert!(matches!(
276            outbox_source_cursor("salesforce-replay-id:42"),
277            Err(ForceSyncError::InvalidOutboxCursor { .. })
278        ));
279    }
280
281    #[test]
282    fn outbox_source_cursor_rejects_snapshot_prefix() {
283        assert!(matches!(
284            outbox_source_cursor("snapshot:abc"),
285            Err(ForceSyncError::InvalidOutboxCursor { .. })
286        ));
287    }
288
289    // ── row_content_error ─────────────────────────────────────────────
290
291    #[test]
292    fn row_content_error_recognizes_json_error() {
293        let Err(json_err) = serde_json::from_str::<Value>("{{invalid}}") else {
294            panic!("expected invalid JSON to fail");
295        };
296        let err: ForceSyncError = json_err.into();
297        assert!(row_content_error(&err));
298    }
299
300    #[test]
301    fn row_content_error_recognizes_invalid_outbox_operation() {
302        let err = ForceSyncError::InvalidOutboxOperation {
303            op: "bad".to_string(),
304        };
305        assert!(row_content_error(&err));
306    }
307
308    #[test]
309    fn row_content_error_recognizes_invalid_outbox_cursor() {
310        let err = ForceSyncError::InvalidOutboxCursor {
311            cursor: "bad".to_string(),
312        };
313        assert!(row_content_error(&err));
314    }
315
316    #[test]
317    fn row_content_error_recognizes_empty_sync_key_part() {
318        let err = ForceSyncError::EmptySyncKeyPart { part: "tenant" };
319        assert!(row_content_error(&err));
320    }
321
322    #[test]
323    fn row_content_error_rejects_missing_source_cursor() {
324        let err = ForceSyncError::MissingSourceCursor;
325        assert!(!row_content_error(&err));
326    }
327}