database_replicator/migration/
checksum.rs

1// ABOUTME: Data validation utilities using checksums
2// ABOUTME: Computes and compares table checksums for data integrity verification
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7/// Result of a checksum comparison between source and target tables
8#[derive(Debug, Clone, PartialEq)]
9pub struct ChecksumResult {
10    pub schema: String,
11    pub table: String,
12    pub source_checksum: String,
13    pub target_checksum: String,
14    pub source_row_count: i64,
15    pub target_row_count: i64,
16    pub matches: bool,
17}
18
19impl ChecksumResult {
20    /// Returns true if both checksums and row counts match
21    pub fn is_valid(&self) -> bool {
22        self.matches && self.source_row_count == self.target_row_count
23    }
24}
25
26/// Compute checksum for a table
27///
28/// This generates an MD5 checksum of all data in the table by:
29/// 1. Querying all columns in the table
30/// 2. Concatenating all column values for each row
31/// 3. Ordering by all columns for deterministic results
32/// 4. Computing MD5 hash of the aggregated data
33pub async fn compute_table_checksum(
34    client: &Client,
35    schema: &str,
36    table: &str,
37) -> Result<(String, i64)> {
38    tracing::debug!("Computing checksum for {}.{}", schema, table);
39
40    // Get all columns for the table
41    let column_query = "
42        SELECT column_name
43        FROM information_schema.columns
44        WHERE table_schema = $1 AND table_name = $2
45        ORDER BY ordinal_position
46    ";
47
48    let column_rows = client
49        .query(column_query, &[&schema, &table])
50        .await
51        .context(format!("Failed to get columns for {}.{}", schema, table))?;
52
53    if column_rows.is_empty() {
54        anyhow::bail!("Table {}.{} has no columns", schema, table);
55    }
56
57    let columns: Vec<String> = column_rows
58        .iter()
59        .map(|row| row.get::<_, String>(0))
60        .collect();
61
62    // Build COALESCE expressions to handle NULLs
63    let coalesce_exprs: Vec<String> = columns
64        .iter()
65        .map(|col| format!("COALESCE(\"{}\"::text, '')", col))
66        .collect();
67
68    let concat_expr = coalesce_exprs.join(" || '|' || ");
69
70    // Build ORDER BY clause using all columns
71    let order_by: Vec<String> = columns.iter().map(|col| format!("\"{}\"", col)).collect();
72    let order_by_clause = order_by.join(", ");
73
74    // Compute checksum: MD5 of all concatenated rows, ordered deterministically
75    let checksum_query = format!(
76        "SELECT
77            md5(string_agg(row_data, '' ORDER BY row_num)) as checksum,
78            COUNT(*) as row_count
79        FROM (
80            SELECT
81                {} as row_data,
82                ROW_NUMBER() OVER (ORDER BY {}) as row_num
83            FROM \"{}\".\"{}\"
84        ) t",
85        concat_expr, order_by_clause, schema, table
86    );
87
88    let result = client
89        .query_one(&checksum_query, &[])
90        .await
91        .context(format!(
92            "Failed to compute checksum for {}.{}",
93            schema, table
94        ))?;
95
96    let checksum: Option<String> = result.get(0);
97    let row_count: i64 = result.get(1);
98
99    // If table is empty, checksum will be NULL
100    let checksum = checksum.unwrap_or_else(|| "empty".to_string());
101
102    tracing::debug!(
103        "Checksum for {}.{}: {} ({} rows)",
104        schema,
105        table,
106        checksum,
107        row_count
108    );
109
110    Ok((checksum, row_count))
111}
112
113/// Compare a table between source and target databases
114pub async fn compare_tables(
115    source_client: &Client,
116    target_client: &Client,
117    schema: &str,
118    table: &str,
119) -> Result<ChecksumResult> {
120    tracing::info!("Comparing table {}.{}", schema, table);
121
122    // Compute checksums in parallel
123    let source_future = compute_table_checksum(source_client, schema, table);
124    let target_future = compute_table_checksum(target_client, schema, table);
125
126    let (source_result, target_result) = tokio::try_join!(source_future, target_future)?;
127
128    let (source_checksum, source_row_count) = source_result;
129    let (target_checksum, target_row_count) = target_result;
130
131    let matches = source_checksum == target_checksum;
132
133    Ok(ChecksumResult {
134        schema: schema.to_string(),
135        table: table.to_string(),
136        source_checksum,
137        target_checksum,
138        source_row_count,
139        target_row_count,
140        matches,
141    })
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::postgres::connect;
148
149    #[tokio::test]
150    #[ignore]
151    async fn test_compute_table_checksum() {
152        let url = std::env::var("TEST_SOURCE_URL").unwrap();
153        let client = connect(&url).await.unwrap();
154
155        // Try to compute checksum for a system table
156        let result = compute_table_checksum(&client, "pg_catalog", "pg_database").await;
157
158        match &result {
159            Ok((checksum, row_count)) => {
160                println!("✓ Checksum computed: {} ({} rows)", checksum, row_count);
161                assert!(!checksum.is_empty());
162                assert!(*row_count > 0);
163            }
164            Err(e) => {
165                println!("Error computing checksum: {:?}", e);
166                panic!("Failed to compute checksum: {:?}", e);
167            }
168        }
169    }
170
171    #[tokio::test]
172    #[ignore]
173    async fn test_compute_empty_table_checksum() {
174        let url = std::env::var("TEST_SOURCE_URL").unwrap();
175        let client = connect(&url).await.unwrap();
176
177        // Create a temporary empty table
178        client
179            .execute("CREATE TEMP TABLE test_empty (id INT, name TEXT)", &[])
180            .await
181            .unwrap();
182
183        let result = compute_table_checksum(&client, "pg_temp", "test_empty").await;
184
185        match &result {
186            Ok((checksum, row_count)) => {
187                println!("✓ Empty table checksum: {} ({} rows)", checksum, row_count);
188                assert_eq!(checksum, "empty");
189                assert_eq!(*row_count, 0);
190            }
191            Err(e) => {
192                println!("Error computing empty table checksum: {:?}", e);
193                panic!("Failed: {:?}", e);
194            }
195        }
196    }
197
198    #[tokio::test]
199    #[ignore]
200    async fn test_compare_tables() {
201        // This test requires both source and target databases
202        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
203        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
204
205        let source_client = connect(&source_url).await.unwrap();
206        let target_client = connect(&target_url).await.unwrap();
207
208        // Compare a system table that should exist on both
209        let result =
210            compare_tables(&source_client, &target_client, "pg_catalog", "pg_database").await;
211
212        match &result {
213            Ok(comparison) => {
214                println!("✓ Table comparison completed");
215                println!("  Schema: {}", comparison.schema);
216                println!("  Table: {}", comparison.table);
217                println!("  Source checksum: {}", comparison.source_checksum);
218                println!("  Target checksum: {}", comparison.target_checksum);
219                println!("  Source rows: {}", comparison.source_row_count);
220                println!("  Target rows: {}", comparison.target_row_count);
221                println!("  Matches: {}", comparison.matches);
222            }
223            Err(e) => {
224                println!("Error comparing tables: {:?}", e);
225                panic!("Failed to compare tables: {:?}", e);
226            }
227        }
228
229        assert!(result.is_ok());
230    }
231
232    #[tokio::test]
233    #[ignore]
234    async fn test_checksum_deterministic() {
235        let url = std::env::var("TEST_SOURCE_URL").unwrap();
236        let client = connect(&url).await.unwrap();
237
238        // Compute checksum twice for the same table
239        let (checksum1, rows1) = compute_table_checksum(&client, "pg_catalog", "pg_database")
240            .await
241            .unwrap();
242
243        let (checksum2, rows2) = compute_table_checksum(&client, "pg_catalog", "pg_database")
244            .await
245            .unwrap();
246
247        // Checksums should be identical (deterministic)
248        assert_eq!(checksum1, checksum2);
249        assert_eq!(rows1, rows2);
250        println!("✓ Checksum is deterministic: {}", checksum1);
251    }
252}