database_replicator/xmin/
reconciler.rs1use anyhow::{Context, Result};
5use std::collections::HashSet;
6use tokio_postgres::types::ToSql;
7use tokio_postgres::Client;
8
9use super::writer::ChangeWriter;
10
11pub struct Reconciler<'a> {
17 source_client: &'a Client,
18 target_client: &'a Client,
19}
20
21impl<'a> Reconciler<'a> {
22 pub fn new(source_client: &'a Client, target_client: &'a Client) -> Self {
24 Self {
25 source_client,
26 target_client,
27 }
28 }
29
30 pub async fn find_orphaned_rows(
45 &self,
46 schema: &str,
47 table: &str,
48 primary_key_columns: &[String],
49 ) -> Result<Vec<Vec<String>>> {
50 let source_pks = self
52 .get_all_primary_keys(self.source_client, schema, table, primary_key_columns)
53 .await
54 .context("Failed to get source primary keys")?;
55
56 let target_pks = self
58 .get_all_primary_keys(self.target_client, schema, table, primary_key_columns)
59 .await
60 .context("Failed to get target primary keys")?;
61
62 let source_set: HashSet<Vec<String>> = source_pks.into_iter().collect();
64 let orphaned: Vec<Vec<String>> = target_pks
65 .into_iter()
66 .filter(|pk| !source_set.contains(pk))
67 .collect();
68
69 tracing::info!(
70 "Found {} orphaned rows in {}.{} that need deletion",
71 orphaned.len(),
72 schema,
73 table
74 );
75
76 Ok(orphaned)
77 }
78
79 pub async fn reconcile_table(
87 &self,
88 schema: &str,
89 table: &str,
90 primary_key_columns: &[String],
91 ) -> Result<u64> {
92 let orphaned = self
93 .find_orphaned_rows(schema, table, primary_key_columns)
94 .await?;
95
96 if orphaned.is_empty() {
97 tracing::info!("No orphaned rows found in {}.{}", schema, table);
98 return Ok(0);
99 }
100
101 let pk_values: Vec<Vec<Box<dyn ToSql + Sync + Send>>> = orphaned
103 .into_iter()
104 .map(|pk| {
105 pk.into_iter()
106 .map(|v| Box::new(v) as Box<dyn ToSql + Sync + Send>)
107 .collect()
108 })
109 .collect();
110
111 let writer = ChangeWriter::new(self.target_client);
113 let deleted = writer
114 .delete_rows(schema, table, primary_key_columns, pk_values)
115 .await?;
116
117 tracing::info!(
118 "Deleted {} orphaned rows from {}.{}",
119 deleted,
120 schema,
121 table
122 );
123
124 Ok(deleted)
125 }
126
127 async fn get_all_primary_keys(
129 &self,
130 client: &Client,
131 schema: &str,
132 table: &str,
133 primary_key_columns: &[String],
134 ) -> Result<Vec<Vec<String>>> {
135 let pk_cols: Vec<String> = primary_key_columns
136 .iter()
137 .map(|c| format!("\"{}\"::text", c))
138 .collect();
139
140 let query = format!(
141 "SELECT {} FROM \"{}\".\"{}\" ORDER BY {}",
142 pk_cols.join(", "),
143 schema,
144 table,
145 primary_key_columns
146 .iter()
147 .map(|c| format!("\"{}\"", c))
148 .collect::<Vec<_>>()
149 .join(", ")
150 );
151
152 let rows = client
153 .query(&query, &[])
154 .await
155 .with_context(|| format!("Failed to get primary keys from {}.{}", schema, table))?;
156
157 let pks: Vec<Vec<String>> = rows
158 .iter()
159 .map(|row| {
160 (0..primary_key_columns.len())
161 .map(|i| row.get::<_, String>(i))
162 .collect()
163 })
164 .collect();
165
166 Ok(pks)
167 }
168
169 pub async fn get_row_counts(&self, schema: &str, table: &str) -> Result<(i64, i64)> {
171 let query = format!("SELECT COUNT(*) FROM \"{}\".\"{}\"", schema, table);
172
173 let source_row = self
174 .source_client
175 .query_one(&query, &[])
176 .await
177 .context("Failed to get source row count")?;
178 let source_count: i64 = source_row.get(0);
179
180 let target_row = self
181 .target_client
182 .query_one(&query, &[])
183 .await
184 .context("Failed to get target row count")?;
185 let target_count: i64 = target_row.get(0);
186
187 Ok((source_count, target_count))
188 }
189
190 pub async fn table_exists_in_target(&self, schema: &str, table: &str) -> Result<bool> {
192 let query = "SELECT EXISTS (
193 SELECT 1 FROM information_schema.tables
194 WHERE table_schema = $1 AND table_name = $2
195 )";
196
197 let row = self
198 .target_client
199 .query_one(query, &[&schema, &table])
200 .await
201 .context("Failed to check if table exists")?;
202
203 Ok(row.get(0))
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct ReconcileConfig {
210 pub delete_orphans: bool,
212 pub max_deletes: Option<usize>,
214 pub skip_tables: Vec<String>,
216}
217
218impl Default for ReconcileConfig {
219 fn default() -> Self {
220 Self {
221 delete_orphans: true,
222 max_deletes: None,
223 skip_tables: Vec::new(),
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
230pub struct ReconcileResult {
231 pub schema: String,
232 pub table: String,
233 pub source_count: i64,
234 pub target_count: i64,
235 pub orphaned_count: usize,
236 pub deleted_count: u64,
237}
238
239impl ReconcileResult {
240 pub fn is_in_sync(&self) -> bool {
242 self.source_count == self.target_count && self.orphaned_count == 0
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 #[test]
251 fn test_reconcile_config_default() {
252 let config = ReconcileConfig::default();
253 assert!(config.delete_orphans);
254 assert!(config.max_deletes.is_none());
255 assert!(config.skip_tables.is_empty());
256 }
257
258 #[test]
259 fn test_reconcile_result_in_sync() {
260 let result = ReconcileResult {
261 schema: "public".to_string(),
262 table: "users".to_string(),
263 source_count: 100,
264 target_count: 100,
265 orphaned_count: 0,
266 deleted_count: 0,
267 };
268 assert!(result.is_in_sync());
269 }
270
271 #[test]
272 fn test_reconcile_result_not_in_sync() {
273 let result = ReconcileResult {
274 schema: "public".to_string(),
275 table: "users".to_string(),
276 source_count: 100,
277 target_count: 105,
278 orphaned_count: 5,
279 deleted_count: 0,
280 };
281 assert!(!result.is_in_sync());
282 }
283}