database_replicator/migration/
checksum.rs1use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct ChecksumResult {
10 pub schema: String,
11 pub table: String,
12 pub source_checksum: String,
13 pub target_checksum: String,
14 pub source_row_count: i64,
15 pub target_row_count: i64,
16 pub matches: bool,
17}
18
19impl ChecksumResult {
20 pub fn is_valid(&self) -> bool {
22 self.matches && self.source_row_count == self.target_row_count
23 }
24}
25
26pub async fn compute_table_checksum(
34 client: &Client,
35 schema: &str,
36 table: &str,
37) -> Result<(String, i64)> {
38 tracing::debug!("Computing checksum for {}.{}", schema, table);
39
40 let column_query = "
42 SELECT column_name
43 FROM information_schema.columns
44 WHERE table_schema = $1 AND table_name = $2
45 ORDER BY ordinal_position
46 ";
47
48 let column_rows = client
49 .query(column_query, &[&schema, &table])
50 .await
51 .context(format!("Failed to get columns for {}.{}", schema, table))?;
52
53 if column_rows.is_empty() {
54 anyhow::bail!("Table {}.{} has no columns", schema, table);
55 }
56
57 let columns: Vec<String> = column_rows
58 .iter()
59 .map(|row| row.get::<_, String>(0))
60 .collect();
61
62 let coalesce_exprs: Vec<String> = columns
64 .iter()
65 .map(|col| format!("COALESCE(\"{}\"::text, '')", col))
66 .collect();
67
68 let concat_expr = coalesce_exprs.join(" || '|' || ");
69
70 let order_by: Vec<String> = columns.iter().map(|col| format!("\"{}\"", col)).collect();
72 let order_by_clause = order_by.join(", ");
73
74 let checksum_query = format!(
76 "SELECT
77 md5(string_agg(row_data, '' ORDER BY row_num)) as checksum,
78 COUNT(*) as row_count
79 FROM (
80 SELECT
81 {} as row_data,
82 ROW_NUMBER() OVER (ORDER BY {}) as row_num
83 FROM \"{}\".\"{}\"
84 ) t",
85 concat_expr, order_by_clause, schema, table
86 );
87
88 let result = client
89 .query_one(&checksum_query, &[])
90 .await
91 .context(format!(
92 "Failed to compute checksum for {}.{}",
93 schema, table
94 ))?;
95
96 let checksum: Option<String> = result.get(0);
97 let row_count: i64 = result.get(1);
98
99 let checksum = checksum.unwrap_or_else(|| "empty".to_string());
101
102 tracing::debug!(
103 "Checksum for {}.{}: {} ({} rows)",
104 schema,
105 table,
106 checksum,
107 row_count
108 );
109
110 Ok((checksum, row_count))
111}
112
113pub async fn compare_tables(
115 source_client: &Client,
116 target_client: &Client,
117 schema: &str,
118 table: &str,
119) -> Result<ChecksumResult> {
120 tracing::info!("Comparing table {}.{}", schema, table);
121
122 let source_future = compute_table_checksum(source_client, schema, table);
124 let target_future = compute_table_checksum(target_client, schema, table);
125
126 let (source_result, target_result) = tokio::try_join!(source_future, target_future)?;
127
128 let (source_checksum, source_row_count) = source_result;
129 let (target_checksum, target_row_count) = target_result;
130
131 let matches = source_checksum == target_checksum;
132
133 Ok(ChecksumResult {
134 schema: schema.to_string(),
135 table: table.to_string(),
136 source_checksum,
137 target_checksum,
138 source_row_count,
139 target_row_count,
140 matches,
141 })
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use crate::postgres::connect;
148
149 #[tokio::test]
150 #[ignore]
151 async fn test_compute_table_checksum() {
152 let url = std::env::var("TEST_SOURCE_URL").unwrap();
153 let client = connect(&url).await.unwrap();
154
155 let result = compute_table_checksum(&client, "pg_catalog", "pg_database").await;
157
158 match &result {
159 Ok((checksum, row_count)) => {
160 println!("✓ Checksum computed: {} ({} rows)", checksum, row_count);
161 assert!(!checksum.is_empty());
162 assert!(*row_count > 0);
163 }
164 Err(e) => {
165 println!("Error computing checksum: {:?}", e);
166 panic!("Failed to compute checksum: {:?}", e);
167 }
168 }
169 }
170
171 #[tokio::test]
172 #[ignore]
173 async fn test_compute_empty_table_checksum() {
174 let url = std::env::var("TEST_SOURCE_URL").unwrap();
175 let client = connect(&url).await.unwrap();
176
177 client
179 .execute("CREATE TEMP TABLE test_empty (id INT, name TEXT)", &[])
180 .await
181 .unwrap();
182
183 let result = compute_table_checksum(&client, "pg_temp", "test_empty").await;
184
185 match &result {
186 Ok((checksum, row_count)) => {
187 println!("✓ Empty table checksum: {} ({} rows)", checksum, row_count);
188 assert_eq!(checksum, "empty");
189 assert_eq!(*row_count, 0);
190 }
191 Err(e) => {
192 println!("Error computing empty table checksum: {:?}", e);
193 panic!("Failed: {:?}", e);
194 }
195 }
196 }
197
198 #[tokio::test]
199 #[ignore]
200 async fn test_compare_tables() {
201 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
203 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
204
205 let source_client = connect(&source_url).await.unwrap();
206 let target_client = connect(&target_url).await.unwrap();
207
208 let result =
210 compare_tables(&source_client, &target_client, "pg_catalog", "pg_database").await;
211
212 match &result {
213 Ok(comparison) => {
214 println!("✓ Table comparison completed");
215 println!(" Schema: {}", comparison.schema);
216 println!(" Table: {}", comparison.table);
217 println!(" Source checksum: {}", comparison.source_checksum);
218 println!(" Target checksum: {}", comparison.target_checksum);
219 println!(" Source rows: {}", comparison.source_row_count);
220 println!(" Target rows: {}", comparison.target_row_count);
221 println!(" Matches: {}", comparison.matches);
222 }
223 Err(e) => {
224 println!("Error comparing tables: {:?}", e);
225 panic!("Failed to compare tables: {:?}", e);
226 }
227 }
228
229 assert!(result.is_ok());
230 }
231
232 #[tokio::test]
233 #[ignore]
234 async fn test_checksum_deterministic() {
235 let url = std::env::var("TEST_SOURCE_URL").unwrap();
236 let client = connect(&url).await.unwrap();
237
238 let (checksum1, rows1) = compute_table_checksum(&client, "pg_catalog", "pg_database")
240 .await
241 .unwrap();
242
243 let (checksum2, rows2) = compute_table_checksum(&client, "pg_catalog", "pg_database")
244 .await
245 .unwrap();
246
247 assert_eq!(checksum1, checksum2);
249 assert_eq!(rows1, rows2);
250 println!("✓ Checksum is deterministic: {}", checksum1);
251 }
252}