use futures::FutureExt;
use serde_json::Value;
use tokio_postgres::GenericClient;
use crate::{
error::ForceSyncError,
identity::SyncKey,
model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
store::pg::{AppendResult, DeadLetter, PgStore},
};
struct OutboxRow {
outbox_id: i64,
tenant: String,
object_name: String,
external_id: String,
source_cursor: String,
op: String,
tombstone: bool,
payload_text: String,
created_at: chrono::DateTime<chrono::Utc>,
}
fn outbox_operation(op: &str, tombstone: bool) -> Result<ChangeOperation, ForceSyncError> {
match (op, tombstone) {
("upsert", false) => Ok(ChangeOperation::Upsert),
("delete", true) => Ok(ChangeOperation::Delete),
("upsert", true) | ("delete", false) => Err(ForceSyncError::InvalidOutboxOperation {
op: format!("{op}:{tombstone}"),
}),
(other_op, _) => Err(ForceSyncError::InvalidOutboxOperation {
op: other_op.to_string(),
}),
}
}
fn outbox_source_cursor(raw: &str) -> Result<SourceCursor, ForceSyncError> {
if raw.starts_with("postgres-lsn:")
|| raw.starts_with("salesforce-replay-id:")
|| raw.starts_with("snapshot:")
{
return Err(ForceSyncError::InvalidOutboxCursor {
cursor: raw.to_string(),
});
}
Ok(SourceCursor::PostgresLsn(raw.to_string()))
}
fn outbox_envelope(row: &OutboxRow) -> Result<ChangeEnvelope, ForceSyncError> {
let payload: Value = serde_json::from_str(&row.payload_text)?;
let sync_key = SyncKey::new(
row.tenant.clone(),
row.object_name.clone(),
row.external_id.clone(),
)?;
let operation = outbox_operation(&row.op, row.tombstone)?;
Ok(ChangeEnvelope::new(
sync_key,
SourceSystem::Postgres,
operation,
row.created_at,
payload,
)
.with_cursor(outbox_source_cursor(&row.source_cursor)?))
}
const fn row_content_error(error: &ForceSyncError) -> bool {
matches!(
error,
ForceSyncError::Json(_)
| ForceSyncError::InvalidOutboxOperation { .. }
| ForceSyncError::InvalidOutboxCursor { .. }
| ForceSyncError::EmptySyncKeyPart { .. }
)
}
async fn quarantine_row<C>(
client: &C,
row: &OutboxRow,
error: &ForceSyncError,
) -> Result<(), ForceSyncError>
where
C: GenericClient + Sync + ?Sized,
{
let payload = serde_json::from_str::<Value>(&row.payload_text).ok();
let dead_letter = DeadLetter {
task_id: None,
tenant: Some(row.tenant.clone()),
object_name: Some(row.object_name.clone()),
external_id: Some(row.external_id.clone()),
error_message: error.to_string(),
payload,
};
crate::store::pg::dead_letter::insert_dead_letter_in_tx(client, &dead_letter).await?;
client
.execute(
"update force_sync_outbox
set processed_at = now()
where outbox_id = $1
and processed_at is null",
&[&row.outbox_id],
)
.await?;
Ok(())
}
async fn capture_batch_in_tx<C>(
client: &C,
limit: i64,
priority: i32,
) -> Result<usize, ForceSyncError>
where
C: GenericClient + Sync + ?Sized,
{
if limit <= 0 {
return Ok(0);
}
let rows = client
.query(
"select outbox_id, tenant, object_name, external_id, source_cursor, op, tombstone, payload::text as payload_text, created_at
from force_sync_outbox
where processed_at is null
order by created_at asc, outbox_id asc
for update skip locked
limit $1",
&[&limit],
)
.await?;
let mut processed = 0usize;
for row in rows {
let outbox_row = OutboxRow {
outbox_id: row.get("outbox_id"),
tenant: row.get("tenant"),
object_name: row.get("object_name"),
external_id: row.get("external_id"),
source_cursor: row.get("source_cursor"),
op: row.get("op"),
tombstone: row.get("tombstone"),
payload_text: row.get("payload_text"),
created_at: row.get("created_at"),
};
match outbox_envelope(&outbox_row) {
Ok(envelope) => match PgStore::append_journal_if_new_in_tx(client, &envelope).await? {
AppendResult::Inserted { journal_id } => {
PgStore::enqueue_apply_task_in_tx(client, journal_id, priority).await?;
client
.execute(
"update force_sync_outbox
set processed_at = now()
where outbox_id = $1
and processed_at is null",
&[&outbox_row.outbox_id],
)
.await?;
processed += 1;
}
AppendResult::Duplicate => {
client
.execute(
"update force_sync_outbox
set processed_at = now()
where outbox_id = $1
and processed_at is null",
&[&outbox_row.outbox_id],
)
.await?;
processed += 1;
}
},
Err(error) if row_content_error(&error) => {
quarantine_row(client, &outbox_row, &error).await?;
processed += 1;
}
Err(error) => return Err(error),
}
}
Ok(processed)
}
pub async fn capture_batch(
store: &PgStore,
limit: i64,
priority: i32,
) -> Result<usize, ForceSyncError> {
store
.with_transaction(|tx| {
async move { capture_batch_in_tx(tx, limit, priority).await }.boxed()
})
.await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{ChangeOperation, SourceCursor};
#[test]
fn outbox_operation_upsert_not_tombstone() {
let Ok(op) = outbox_operation("upsert", false) else {
panic!("expected Ok for upsert/false");
};
assert_eq!(op, ChangeOperation::Upsert);
}
#[test]
fn outbox_operation_delete_tombstone() {
let Ok(op) = outbox_operation("delete", true) else {
panic!("expected Ok for delete/true");
};
assert_eq!(op, ChangeOperation::Delete);
}
#[test]
fn outbox_operation_upsert_tombstone_is_invalid() {
assert!(matches!(
outbox_operation("upsert", true),
Err(ForceSyncError::InvalidOutboxOperation { .. })
));
}
#[test]
fn outbox_operation_delete_not_tombstone_is_invalid() {
assert!(matches!(
outbox_operation("delete", false),
Err(ForceSyncError::InvalidOutboxOperation { .. })
));
}
#[test]
fn outbox_operation_unknown_op_is_invalid() {
assert!(matches!(
outbox_operation("insert", false),
Err(ForceSyncError::InvalidOutboxOperation { .. })
));
}
#[test]
fn outbox_source_cursor_plain_lsn_succeeds() {
let Ok(cursor) = outbox_source_cursor("0/16B3740") else {
panic!("expected Ok for plain LSN");
};
assert!(matches!(cursor, SourceCursor::PostgresLsn(ref lsn) if lsn == "0/16B3740"));
}
#[test]
fn outbox_source_cursor_rejects_postgres_lsn_prefix() {
assert!(matches!(
outbox_source_cursor("postgres-lsn:0/16B3740"),
Err(ForceSyncError::InvalidOutboxCursor { .. })
));
}
#[test]
fn outbox_source_cursor_rejects_salesforce_replay_id_prefix() {
assert!(matches!(
outbox_source_cursor("salesforce-replay-id:42"),
Err(ForceSyncError::InvalidOutboxCursor { .. })
));
}
#[test]
fn outbox_source_cursor_rejects_snapshot_prefix() {
assert!(matches!(
outbox_source_cursor("snapshot:abc"),
Err(ForceSyncError::InvalidOutboxCursor { .. })
));
}
#[test]
fn row_content_error_recognizes_json_error() {
let Err(json_err) = serde_json::from_str::<Value>("{{invalid}}") else {
panic!("expected invalid JSON to fail");
};
let err: ForceSyncError = json_err.into();
assert!(row_content_error(&err));
}
#[test]
fn row_content_error_recognizes_invalid_outbox_operation() {
let err = ForceSyncError::InvalidOutboxOperation {
op: "bad".to_string(),
};
assert!(row_content_error(&err));
}
#[test]
fn row_content_error_recognizes_invalid_outbox_cursor() {
let err = ForceSyncError::InvalidOutboxCursor {
cursor: "bad".to_string(),
};
assert!(row_content_error(&err));
}
#[test]
fn row_content_error_recognizes_empty_sync_key_part() {
let err = ForceSyncError::EmptySyncKeyPart { part: "tenant" };
assert!(row_content_error(&err));
}
#[test]
fn row_content_error_rejects_missing_source_cursor() {
let err = ForceSyncError::MissingSourceCursor;
assert!(!row_content_error(&err));
}
}