database_replicator/xmin/
reconciler.rs

1// ABOUTME: Reconciler for xmin-based sync - detects deleted rows in source
2// ABOUTME: Compares primary keys between source and target to find orphaned rows
3
4use anyhow::{Context, Result};
5use std::collections::HashSet;
6use tokio_postgres::types::ToSql;
7use tokio_postgres::Client;
8
9use super::writer::ChangeWriter;
10
11/// Reconciler detects rows that exist in target but not in source (deletions).
12///
13/// Since xmin-based sync only sees modified rows, it cannot detect deletions.
14/// The Reconciler performs periodic full-table primary key comparisons to find
15/// rows that need to be deleted from the target.
16pub struct Reconciler<'a> {
17    source_client: &'a Client,
18    target_client: &'a Client,
19}
20
21impl<'a> Reconciler<'a> {
22    /// Create a new Reconciler with source and target database connections.
23    pub fn new(source_client: &'a Client, target_client: &'a Client) -> Self {
24        Self {
25            source_client,
26            target_client,
27        }
28    }
29
30    /// Find rows that exist in target but not in source (orphaned rows).
31    ///
32    /// This performs a primary key comparison between source and target tables.
33    /// Returns the primary key values of rows that should be deleted from target.
34    ///
35    /// # Arguments
36    ///
37    /// * `schema` - Schema name
38    /// * `table` - Table name
39    /// * `primary_key_columns` - Primary key column names
40    ///
41    /// # Returns
42    ///
43    /// A vector of primary key value tuples for orphaned rows.
44    pub async fn find_orphaned_rows(
45        &self,
46        schema: &str,
47        table: &str,
48        primary_key_columns: &[String],
49    ) -> Result<Vec<Vec<String>>> {
50        // Get all PKs from source
51        let source_pks = self
52            .get_all_primary_keys(self.source_client, schema, table, primary_key_columns)
53            .await
54            .context("Failed to get source primary keys")?;
55
56        // Get all PKs from target
57        let target_pks = self
58            .get_all_primary_keys(self.target_client, schema, table, primary_key_columns)
59            .await
60            .context("Failed to get target primary keys")?;
61
62        // Find PKs in target that don't exist in source
63        let source_set: HashSet<Vec<String>> = source_pks.into_iter().collect();
64        let orphaned: Vec<Vec<String>> = target_pks
65            .into_iter()
66            .filter(|pk| !source_set.contains(pk))
67            .collect();
68
69        tracing::info!(
70            "Found {} orphaned rows in {}.{} that need deletion",
71            orphaned.len(),
72            schema,
73            table
74        );
75
76        Ok(orphaned)
77    }
78
79    /// Reconcile a table by deleting orphaned rows from target.
80    ///
81    /// This is a convenience method that finds orphaned rows and deletes them.
82    ///
83    /// # Returns
84    ///
85    /// The number of rows deleted from target.
86    pub async fn reconcile_table(
87        &self,
88        schema: &str,
89        table: &str,
90        primary_key_columns: &[String],
91    ) -> Result<u64> {
92        let orphaned = self
93            .find_orphaned_rows(schema, table, primary_key_columns)
94            .await?;
95
96        if orphaned.is_empty() {
97            tracing::info!("No orphaned rows found in {}.{}", schema, table);
98            return Ok(0);
99        }
100
101        // Convert string PKs to ToSql values
102        let pk_values: Vec<Vec<Box<dyn ToSql + Sync + Send>>> = orphaned
103            .into_iter()
104            .map(|pk| {
105                pk.into_iter()
106                    .map(|v| Box::new(v) as Box<dyn ToSql + Sync + Send>)
107                    .collect()
108            })
109            .collect();
110
111        // Delete orphaned rows
112        let writer = ChangeWriter::new(self.target_client);
113        let deleted = writer
114            .delete_rows(schema, table, primary_key_columns, pk_values)
115            .await?;
116
117        tracing::info!(
118            "Deleted {} orphaned rows from {}.{}",
119            deleted,
120            schema,
121            table
122        );
123
124        Ok(deleted)
125    }
126
127    /// Get all primary key values from a table.
128    async fn get_all_primary_keys(
129        &self,
130        client: &Client,
131        schema: &str,
132        table: &str,
133        primary_key_columns: &[String],
134    ) -> Result<Vec<Vec<String>>> {
135        let pk_cols: Vec<String> = primary_key_columns
136            .iter()
137            .map(|c| format!("\"{}\"::text", c))
138            .collect();
139
140        let query = format!(
141            "SELECT {} FROM \"{}\".\"{}\" ORDER BY {}",
142            pk_cols.join(", "),
143            schema,
144            table,
145            primary_key_columns
146                .iter()
147                .map(|c| format!("\"{}\"", c))
148                .collect::<Vec<_>>()
149                .join(", ")
150        );
151
152        let rows = client
153            .query(&query, &[])
154            .await
155            .with_context(|| format!("Failed to get primary keys from {}.{}", schema, table))?;
156
157        let pks: Vec<Vec<String>> = rows
158            .iter()
159            .map(|row| {
160                (0..primary_key_columns.len())
161                    .map(|i| row.get::<_, String>(i))
162                    .collect()
163            })
164            .collect();
165
166        Ok(pks)
167    }
168
169    /// Get count of rows in source and target for comparison.
170    pub async fn get_row_counts(&self, schema: &str, table: &str) -> Result<(i64, i64)> {
171        let query = format!("SELECT COUNT(*) FROM \"{}\".\"{}\"", schema, table);
172
173        let source_row = self
174            .source_client
175            .query_one(&query, &[])
176            .await
177            .context("Failed to get source row count")?;
178        let source_count: i64 = source_row.get(0);
179
180        let target_row = self
181            .target_client
182            .query_one(&query, &[])
183            .await
184            .context("Failed to get target row count")?;
185        let target_count: i64 = target_row.get(0);
186
187        Ok((source_count, target_count))
188    }
189
190    /// Check if a table exists in the target database.
191    pub async fn table_exists_in_target(&self, schema: &str, table: &str) -> Result<bool> {
192        let query = "SELECT EXISTS (
193            SELECT 1 FROM information_schema.tables
194            WHERE table_schema = $1 AND table_name = $2
195        )";
196
197        let row = self
198            .target_client
199            .query_one(query, &[&schema, &table])
200            .await
201            .context("Failed to check if table exists")?;
202
203        Ok(row.get(0))
204    }
205}
206
207/// Configuration for reconciliation behavior.
208#[derive(Debug, Clone)]
209pub struct ReconcileConfig {
210    /// Whether to actually delete orphaned rows (false = dry run)
211    pub delete_orphans: bool,
212    /// Maximum number of orphans to delete in one batch
213    pub max_deletes: Option<usize>,
214    /// Tables to skip during reconciliation
215    pub skip_tables: Vec<String>,
216}
217
218impl Default for ReconcileConfig {
219    fn default() -> Self {
220        Self {
221            delete_orphans: true,
222            max_deletes: None,
223            skip_tables: Vec::new(),
224        }
225    }
226}
227
228/// Result of a reconciliation operation.
229#[derive(Debug, Clone)]
230pub struct ReconcileResult {
231    pub schema: String,
232    pub table: String,
233    pub source_count: i64,
234    pub target_count: i64,
235    pub orphaned_count: usize,
236    pub deleted_count: u64,
237}
238
239impl ReconcileResult {
240    /// Check if the table is in sync (same row count, no orphans).
241    pub fn is_in_sync(&self) -> bool {
242        self.source_count == self.target_count && self.orphaned_count == 0
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn test_reconcile_config_default() {
252        let config = ReconcileConfig::default();
253        assert!(config.delete_orphans);
254        assert!(config.max_deletes.is_none());
255        assert!(config.skip_tables.is_empty());
256    }
257
258    #[test]
259    fn test_reconcile_result_in_sync() {
260        let result = ReconcileResult {
261            schema: "public".to_string(),
262            table: "users".to_string(),
263            source_count: 100,
264            target_count: 100,
265            orphaned_count: 0,
266            deleted_count: 0,
267        };
268        assert!(result.is_in_sync());
269    }
270
271    #[test]
272    fn test_reconcile_result_not_in_sync() {
273        let result = ReconcileResult {
274            schema: "public".to_string(),
275            table: "users".to_string(),
276            source_count: 100,
277            target_count: 105,
278            orphaned_count: 5,
279            deleted_count: 0,
280        };
281        assert!(!result.is_in_sync());
282    }
283}