1use futures::FutureExt;
4use serde_json::Value;
5use tokio_postgres::GenericClient;
6
7use crate::{
8 error::ForceSyncError,
9 identity::SyncKey,
10 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
11 store::pg::{AppendResult, DeadLetter, PgStore},
12};
13
14struct OutboxRow {
15 outbox_id: i64,
16 tenant: String,
17 object_name: String,
18 external_id: String,
19 source_cursor: String,
20 op: String,
21 tombstone: bool,
22 payload_text: String,
23 created_at: chrono::DateTime<chrono::Utc>,
24}
25
26fn outbox_operation(op: &str, tombstone: bool) -> Result<ChangeOperation, ForceSyncError> {
27 match (op, tombstone) {
28 ("upsert", false) => Ok(ChangeOperation::Upsert),
29 ("delete", true) => Ok(ChangeOperation::Delete),
30 ("upsert", true) | ("delete", false) => Err(ForceSyncError::InvalidOutboxOperation {
31 op: format!("{op}:{tombstone}"),
32 }),
33 (other_op, _) => Err(ForceSyncError::InvalidOutboxOperation {
34 op: other_op.to_string(),
35 }),
36 }
37}
38
39fn outbox_source_cursor(raw: &str) -> Result<SourceCursor, ForceSyncError> {
40 if raw.starts_with("postgres-lsn:")
41 || raw.starts_with("salesforce-replay-id:")
42 || raw.starts_with("snapshot:")
43 {
44 return Err(ForceSyncError::InvalidOutboxCursor {
45 cursor: raw.to_string(),
46 });
47 }
48
49 Ok(SourceCursor::PostgresLsn(raw.to_string()))
50}
51
52fn outbox_envelope(row: &OutboxRow) -> Result<ChangeEnvelope, ForceSyncError> {
53 let payload: Value = serde_json::from_str(&row.payload_text)?;
54 let sync_key = SyncKey::new(
55 row.tenant.clone(),
56 row.object_name.clone(),
57 row.external_id.clone(),
58 )?;
59 let operation = outbox_operation(&row.op, row.tombstone)?;
60
61 Ok(ChangeEnvelope::new(
62 sync_key,
63 SourceSystem::Postgres,
64 operation,
65 row.created_at,
66 payload,
67 )
68 .with_cursor(outbox_source_cursor(&row.source_cursor)?))
69}
70
71const fn row_content_error(error: &ForceSyncError) -> bool {
72 matches!(
73 error,
74 ForceSyncError::Json(_)
75 | ForceSyncError::InvalidOutboxOperation { .. }
76 | ForceSyncError::InvalidOutboxCursor { .. }
77 | ForceSyncError::EmptySyncKeyPart { .. }
78 )
79}
80
81async fn quarantine_row<C>(
82 client: &C,
83 row: &OutboxRow,
84 error: &ForceSyncError,
85) -> Result<(), ForceSyncError>
86where
87 C: GenericClient + Sync + ?Sized,
88{
89 let payload = serde_json::from_str::<Value>(&row.payload_text).ok();
90 let dead_letter = DeadLetter {
91 task_id: None,
92 tenant: Some(row.tenant.clone()),
93 object_name: Some(row.object_name.clone()),
94 external_id: Some(row.external_id.clone()),
95 error_message: error.to_string(),
96 payload,
97 };
98
99 crate::store::pg::dead_letter::insert_dead_letter_in_tx(client, &dead_letter).await?;
100 client
101 .execute(
102 "update force_sync_outbox
103 set processed_at = now()
104 where outbox_id = $1
105 and processed_at is null",
106 &[&row.outbox_id],
107 )
108 .await?;
109 Ok(())
110}
111
112async fn capture_batch_in_tx<C>(
113 client: &C,
114 limit: i64,
115 priority: i32,
116) -> Result<usize, ForceSyncError>
117where
118 C: GenericClient + Sync + ?Sized,
119{
120 if limit <= 0 {
121 return Ok(0);
122 }
123
124 let rows = client
125 .query(
126 "select outbox_id, tenant, object_name, external_id, source_cursor, op, tombstone, payload::text as payload_text, created_at
127 from force_sync_outbox
128 where processed_at is null
129 order by created_at asc, outbox_id asc
130 for update skip locked
131 limit $1",
132 &[&limit],
133 )
134 .await?;
135
136 let mut processed = 0usize;
137
138 for row in rows {
139 let outbox_row = OutboxRow {
140 outbox_id: row.get("outbox_id"),
141 tenant: row.get("tenant"),
142 object_name: row.get("object_name"),
143 external_id: row.get("external_id"),
144 source_cursor: row.get("source_cursor"),
145 op: row.get("op"),
146 tombstone: row.get("tombstone"),
147 payload_text: row.get("payload_text"),
148 created_at: row.get("created_at"),
149 };
150
151 match outbox_envelope(&outbox_row) {
152 Ok(envelope) => match PgStore::append_journal_if_new_in_tx(client, &envelope).await? {
153 AppendResult::Inserted { journal_id } => {
154 PgStore::enqueue_apply_task_in_tx(client, journal_id, priority).await?;
155 client
156 .execute(
157 "update force_sync_outbox
158 set processed_at = now()
159 where outbox_id = $1
160 and processed_at is null",
161 &[&outbox_row.outbox_id],
162 )
163 .await?;
164 processed += 1;
165 }
166 AppendResult::Duplicate => {
167 client
168 .execute(
169 "update force_sync_outbox
170 set processed_at = now()
171 where outbox_id = $1
172 and processed_at is null",
173 &[&outbox_row.outbox_id],
174 )
175 .await?;
176 processed += 1;
177 }
178 },
179 Err(error) if row_content_error(&error) => {
180 quarantine_row(client, &outbox_row, &error).await?;
181 processed += 1;
182 }
183 Err(error) => return Err(error),
184 }
185 }
186
187 Ok(processed)
188}
189
190pub async fn capture_batch(
197 store: &PgStore,
198 limit: i64,
199 priority: i32,
200) -> Result<usize, ForceSyncError> {
201 store
202 .with_transaction(|tx| {
203 async move { capture_batch_in_tx(tx, limit, priority).await }.boxed()
204 })
205 .await
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::model::{ChangeOperation, SourceCursor};
212
213 #[test]
216 fn outbox_operation_upsert_not_tombstone() {
217 let Ok(op) = outbox_operation("upsert", false) else {
218 panic!("expected Ok for upsert/false");
219 };
220 assert_eq!(op, ChangeOperation::Upsert);
221 }
222
223 #[test]
224 fn outbox_operation_delete_tombstone() {
225 let Ok(op) = outbox_operation("delete", true) else {
226 panic!("expected Ok for delete/true");
227 };
228 assert_eq!(op, ChangeOperation::Delete);
229 }
230
231 #[test]
232 fn outbox_operation_upsert_tombstone_is_invalid() {
233 assert!(matches!(
234 outbox_operation("upsert", true),
235 Err(ForceSyncError::InvalidOutboxOperation { .. })
236 ));
237 }
238
239 #[test]
240 fn outbox_operation_delete_not_tombstone_is_invalid() {
241 assert!(matches!(
242 outbox_operation("delete", false),
243 Err(ForceSyncError::InvalidOutboxOperation { .. })
244 ));
245 }
246
247 #[test]
248 fn outbox_operation_unknown_op_is_invalid() {
249 assert!(matches!(
250 outbox_operation("insert", false),
251 Err(ForceSyncError::InvalidOutboxOperation { .. })
252 ));
253 }
254
255 #[test]
258 fn outbox_source_cursor_plain_lsn_succeeds() {
259 let Ok(cursor) = outbox_source_cursor("0/16B3740") else {
260 panic!("expected Ok for plain LSN");
261 };
262 assert!(matches!(cursor, SourceCursor::PostgresLsn(ref lsn) if lsn == "0/16B3740"));
263 }
264
265 #[test]
266 fn outbox_source_cursor_rejects_postgres_lsn_prefix() {
267 assert!(matches!(
268 outbox_source_cursor("postgres-lsn:0/16B3740"),
269 Err(ForceSyncError::InvalidOutboxCursor { .. })
270 ));
271 }
272
273 #[test]
274 fn outbox_source_cursor_rejects_salesforce_replay_id_prefix() {
275 assert!(matches!(
276 outbox_source_cursor("salesforce-replay-id:42"),
277 Err(ForceSyncError::InvalidOutboxCursor { .. })
278 ));
279 }
280
281 #[test]
282 fn outbox_source_cursor_rejects_snapshot_prefix() {
283 assert!(matches!(
284 outbox_source_cursor("snapshot:abc"),
285 Err(ForceSyncError::InvalidOutboxCursor { .. })
286 ));
287 }
288
289 #[test]
292 fn row_content_error_recognizes_json_error() {
293 let Err(json_err) = serde_json::from_str::<Value>("{{invalid}}") else {
294 panic!("expected invalid JSON to fail");
295 };
296 let err: ForceSyncError = json_err.into();
297 assert!(row_content_error(&err));
298 }
299
300 #[test]
301 fn row_content_error_recognizes_invalid_outbox_operation() {
302 let err = ForceSyncError::InvalidOutboxOperation {
303 op: "bad".to_string(),
304 };
305 assert!(row_content_error(&err));
306 }
307
308 #[test]
309 fn row_content_error_recognizes_invalid_outbox_cursor() {
310 let err = ForceSyncError::InvalidOutboxCursor {
311 cursor: "bad".to_string(),
312 };
313 assert!(row_content_error(&err));
314 }
315
316 #[test]
317 fn row_content_error_recognizes_empty_sync_key_part() {
318 let err = ForceSyncError::EmptySyncKeyPart { part: "tenant" };
319 assert!(row_content_error(&err));
320 }
321
322 #[test]
323 fn row_content_error_rejects_missing_source_cursor() {
324 let err = ForceSyncError::MissingSourceCursor;
325 assert!(!row_content_error(&err));
326 }
327}