1use futures::FutureExt;
4use tokio_postgres::GenericClient;
5
6use crate::{error::ForceSyncError, store::pg::PgStore};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct DriftItem {
11 pub journal_id: i64,
13 pub tenant: String,
15 pub object_name: String,
17 pub external_id: String,
19 pub payload_hash: Vec<u8>,
21}
22
23async fn detect_drift_query<C>(client: &C, limit: i64) -> Result<Vec<DriftItem>, ForceSyncError>
24where
25 C: GenericClient + Sync + ?Sized,
26{
27 if limit <= 0 {
28 return Ok(Vec::new());
29 }
30
31 let rows = client
32 .query(
33 "with latest as (
34 select distinct on (tenant, object_name, external_id)
35 journal_id,
36 tenant,
37 object_name,
38 external_id,
39 payload_hash
40 from sync_journal
41 order by tenant, object_name, external_id, journal_id desc
42 )
43 select
44 latest.journal_id,
45 latest.tenant,
46 latest.object_name,
47 latest.external_id,
48 latest.payload_hash
49 from latest
50 left join sync_link link
51 on link.tenant = latest.tenant
52 and link.object_name = latest.object_name
53 and link.external_id = latest.external_id
54 where link.link_id is null
55 or link.last_payload_hash is distinct from latest.payload_hash
56 order by latest.journal_id asc
57 limit $1",
58 &[&limit],
59 )
60 .await?;
61
62 Ok(rows
63 .into_iter()
64 .map(|row| DriftItem {
65 journal_id: row.get(0),
66 tenant: row.get(1),
67 object_name: row.get(2),
68 external_id: row.get(3),
69 payload_hash: row.get(4),
70 })
71 .collect())
72}
73
74async fn enqueue_repair_in_tx<C>(client: &C, journal_id: i64) -> Result<i64, ForceSyncError>
75where
76 C: GenericClient + Sync + ?Sized,
77{
78 if let Some(task_id) = existing_repair_task_in_tx(client, journal_id).await? {
79 return Ok(task_id);
80 }
81
82 PgStore::enqueue_apply_task_in_tx(client, journal_id, 0).await
83}
84
85async fn existing_repair_task_in_tx<C>(
86 client: &C,
87 journal_id: i64,
88) -> Result<Option<i64>, ForceSyncError>
89where
90 C: GenericClient + Sync + ?Sized,
91{
92 let journal_key = journal_id.to_string();
93 let row = client
94 .query_opt(
95 "select task_id
96 from sync_task
97 where task_kind = 'apply'
98 and target_key = $1
99 and status in ('ready', 'leased')
100 order by task_id desc
101 limit 1",
102 &[&journal_key],
103 )
104 .await?;
105
106 Ok(row.map(|row| row.get(0)))
107}
108
109pub async fn detect_drift(store: &PgStore, limit: i64) -> Result<Vec<DriftItem>, ForceSyncError> {
115 let client = store.pool().get().await?;
116 detect_drift_query(&**client, limit).await
117}
118
119pub async fn enqueue_repair(store: &PgStore, journal_id: i64) -> Result<i64, ForceSyncError> {
125 store
126 .with_transaction(|tx| async move { enqueue_repair_in_tx(tx, journal_id).await }.boxed())
127 .await
128}
129
130pub async fn run_reconcile_once(store: &PgStore, limit: i64) -> Result<usize, ForceSyncError> {
136 store
137 .with_transaction(|tx| {
138 async move {
139 let drift = detect_drift_query(tx, limit).await?;
140 let mut enqueued = 0usize;
141
142 for item in drift {
143 enqueue_repair_in_tx(tx, item.journal_id).await?;
144 enqueued += 1;
145 }
146
147 Ok(enqueued)
148 }
149 .boxed()
150 })
151 .await
152}