Skip to main content

force_sync/
reconcile.rs

1//! Postgres-first reconcile helpers for force-sync.
2
3use futures::FutureExt;
4use tokio_postgres::GenericClient;
5
6use crate::{PgStore, error::ForceSyncError};
7
8/// A drift candidate discovered during reconciliation.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct DriftItem {
11    /// Journal row identifier to repair.
12    pub journal_id: i64,
13    /// Canonical tenant identifier.
14    pub tenant: String,
15    /// Salesforce object name.
16    pub object_name: String,
17    /// Canonical external ID.
18    pub external_id: String,
19    /// Payload hash from the latest journal row.
20    pub payload_hash: Vec<u8>,
21}
22
23async fn detect_drift_query<C>(client: &C, limit: i64) -> Result<Vec<DriftItem>, ForceSyncError>
24where
25    C: GenericClient + Sync + ?Sized,
26{
27    if limit <= 0 {
28        return Ok(Vec::new());
29    }
30
31    let rows = client
32        .query(
33            "with latest as (
34                select distinct on (tenant, object_name, external_id)
35                    journal_id,
36                    tenant,
37                    object_name,
38                    external_id,
39                    payload_hash
40                from sync_journal
41                order by tenant, object_name, external_id, journal_id desc
42            )
43            select
44                latest.journal_id,
45                latest.tenant,
46                latest.object_name,
47                latest.external_id,
48                latest.payload_hash
49            from latest
50            left join sync_link link
51              on link.tenant = latest.tenant
52             and link.object_name = latest.object_name
53             and link.external_id = latest.external_id
54            where link.link_id is null
55               or link.last_payload_hash is distinct from latest.payload_hash
56            order by latest.journal_id asc
57            limit $1",
58            &[&limit],
59        )
60        .await?;
61
62    Ok(rows
63        .into_iter()
64        .map(|row| DriftItem {
65            journal_id: row.get(0),
66            tenant: row.get(1),
67            object_name: row.get(2),
68            external_id: row.get(3),
69            payload_hash: row.get(4),
70        })
71        .collect())
72}
73
74async fn enqueue_repair_in_tx<C>(client: &C, journal_id: i64) -> Result<i64, ForceSyncError>
75where
76    C: GenericClient + Sync + ?Sized,
77{
78    if let Some(task_id) = existing_repair_task_in_tx(client, journal_id).await? {
79        return Ok(task_id);
80    }
81
82    PgStore::enqueue_apply_task_in_tx(client, journal_id, 0).await
83}
84
85async fn existing_repair_task_in_tx<C>(
86    client: &C,
87    journal_id: i64,
88) -> Result<Option<i64>, ForceSyncError>
89where
90    C: GenericClient + Sync + ?Sized,
91{
92    let journal_key = journal_id.to_string();
93    let row = client
94        .query_opt(
95            "select task_id
96             from sync_task
97             where task_kind = 'apply'
98               and target_key = $1
99               and status in ('ready', 'leased')
100             order by task_id desc
101             limit 1",
102            &[&journal_key],
103        )
104        .await?;
105
106    Ok(row.map(|row| row.get(0)))
107}
108
109/// Detects drift by comparing the latest journal row for each record to the stored link.
110///
111/// # Errors
112///
113/// Returns an error if the query fails.
114pub async fn detect_drift(store: &PgStore, limit: i64) -> Result<Vec<DriftItem>, ForceSyncError> {
115    let client = store.pool().get().await?;
116    detect_drift_query(&**client, limit).await
117}
118
119/// Enqueues a repair task for a journal row.
120///
121/// # Errors
122///
123/// Returns an error if the task cannot be written.
124pub async fn enqueue_repair(store: &PgStore, journal_id: i64) -> Result<i64, ForceSyncError> {
125    store
126        .with_transaction(|tx| async move { enqueue_repair_in_tx(tx, journal_id).await }.boxed())
127        .await
128}
129
130/// Runs one reconciliation pass and enqueues repair tasks for detected drift.
131///
132/// # Errors
133///
134/// Returns an error if the detection or enqueue steps fail.
135pub async fn run_reconcile_once(store: &PgStore, limit: i64) -> Result<usize, ForceSyncError> {
136    store
137        .with_transaction(|tx| {
138            async move {
139                let drift = detect_drift_query(tx, limit).await?;
140                let mut enqueued = 0usize;
141
142                for item in drift {
143                    enqueue_repair_in_tx(tx, item.journal_id).await?;
144                    enqueued += 1;
145                }
146
147                Ok(enqueued)
148            }
149            .boxed()
150        })
151        .await
152}