database_replicator/commands/
verify.rs1use crate::migration::{self, compare_tables, list_tables};
5use crate::postgres::connect;
6use anyhow::{Context, Result};
7use futures::stream::{self, StreamExt};
8use indicatif::{ProgressBar, ProgressStyle};
9
10pub async fn verify(
71 source_url: &str,
72 target_url: &str,
73 filter: Option<crate::filters::ReplicationFilter>,
74) -> Result<()> {
75 let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
76
77 tracing::info!("Starting data integrity verification...");
78 tracing::info!("");
79
80 crate::utils::validate_source_target_different(source_url, target_url)
82 .context("Source and target validation failed")?;
83 tracing::info!("✓ Verified source and target are different databases");
84 tracing::info!("");
85
86 tracing::info!("Connecting to source database...");
88 let source_client = connect(source_url)
89 .await
90 .context("Failed to connect to source database")?;
91
92 tracing::info!("Discovering databases on source...");
94 let all_databases = migration::list_databases(&source_client)
95 .await
96 .context("Failed to list databases on source")?;
97
98 let databases: Vec<_> = all_databases
100 .into_iter()
101 .filter(|db| filter.should_replicate_database(&db.name))
102 .collect();
103
104 if databases.is_empty() {
105 tracing::warn!("⚠ No databases matched the filter criteria");
106 tracing::warn!(" No verification to perform");
107 return Ok(());
108 }
109
110 tracing::info!("Found {} database(s) to verify:", databases.len());
111 for db in &databases {
112 tracing::info!(" - {}", db.name);
113 }
114 tracing::info!("");
115
116 let mut total_matches = 0;
118 let mut total_mismatches = 0;
119 let mut total_tables = 0;
120
121 for db in &databases {
123 tracing::info!("========================================");
124 tracing::info!("Database: '{}'", db.name);
125 tracing::info!("========================================");
126
127 let source_db_url = replace_database_in_url(source_url, &db.name).context(format!(
129 "Failed to build source URL for database '{}'",
130 db.name
131 ))?;
132 let target_db_url = replace_database_in_url(target_url, &db.name).context(format!(
133 "Failed to build target URL for database '{}'",
134 db.name
135 ))?;
136
137 tracing::info!("Connecting to database '{}'...", db.name);
139 let source_db_client = connect(&source_db_url).await.context(format!(
140 "Failed to connect to source database '{}'",
141 db.name
142 ))?;
143 let target_db_client = connect(&target_db_url).await.context(format!(
144 "Failed to connect to target database '{}'",
145 db.name
146 ))?;
147
148 tracing::info!("Discovering tables...");
150 let all_tables = list_tables(&source_db_client)
151 .await
152 .context(format!("Failed to list tables from database '{}'", db.name))?;
153
154 let tables: Vec<_> = all_tables
156 .into_iter()
157 .filter(|table| {
158 let table_name = if table.schema == "public" {
160 table.name.clone()
161 } else {
162 format!("{}.{}", table.schema, table.name)
163 };
164 filter.should_replicate_table(&db.name, &table_name)
165 })
166 .collect();
167
168 if tables.is_empty() {
169 tracing::warn!("⚠ No tables found to verify in database '{}'", db.name);
170 tracing::info!("");
171 continue;
172 }
173
174 tracing::info!("Found {} tables to verify", tables.len());
175 tracing::info!("Using parallel verification (concurrency: 4)");
176 tracing::info!("");
177
178 let progress = ProgressBar::new(tables.len() as u64);
180 progress.set_style(
181 ProgressStyle::default_bar()
182 .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}")
183 .unwrap()
184 .progress_chars("##-"),
185 );
186
187 let source_db_client2 = connect(&source_db_url).await.context(format!(
189 "Failed to create additional source connection 2 for database '{}'",
190 db.name
191 ))?;
192 let target_db_client2 = connect(&target_db_url).await.context(format!(
193 "Failed to create additional target connection 2 for database '{}'",
194 db.name
195 ))?;
196 let source_db_client3 = connect(&source_db_url).await.context(format!(
197 "Failed to create additional source connection 3 for database '{}'",
198 db.name
199 ))?;
200 let target_db_client3 = connect(&target_db_url).await.context(format!(
201 "Failed to create additional target connection 3 for database '{}'",
202 db.name
203 ))?;
204 let source_db_client4 = connect(&source_db_url).await.context(format!(
205 "Failed to create additional source connection 4 for database '{}'",
206 db.name
207 ))?;
208 let target_db_client4 = connect(&target_db_url).await.context(format!(
209 "Failed to create additional target connection 4 for database '{}'",
210 db.name
211 ))?;
212
213 let source_clients = [
215 source_db_client,
216 source_db_client2,
217 source_db_client3,
218 source_db_client4,
219 ];
220 let target_clients = [
221 target_db_client,
222 target_db_client2,
223 target_db_client3,
224 target_db_client4,
225 ];
226
227 let verification_results: Vec<_> = stream::iter(tables.iter().enumerate())
229 .map(|(idx, table)| {
230 let schema = table.schema.clone();
231 let name = table.name.clone();
232 let source_client = &source_clients[idx % source_clients.len()];
233 let target_client = &target_clients[idx % target_clients.len()];
234 let pb = progress.clone();
235
236 async move {
237 let result = compare_tables(source_client, target_client, &schema, &name).await;
238 pb.inc(1);
239 pb.set_message(format!("Verified {}.{}", schema, name));
240 (schema, name, result)
241 }
242 })
243 .buffer_unordered(4) .collect()
245 .await;
246
247 progress.finish_with_message(format!("Verification complete for database '{}'", db.name));
248 tracing::info!("");
249
250 let mut db_mismatches = 0;
252 let mut db_matches = 0;
253
254 for (schema, name, result) in verification_results {
255 match result {
256 Ok(checksum_result) => {
257 if checksum_result.is_valid() {
258 tracing::info!(
259 " ✓ {}.{}: Match ({} rows, checksum: {})",
260 schema,
261 name,
262 checksum_result.source_row_count,
263 &checksum_result.source_checksum[..8]
264 );
265 db_matches += 1;
266 } else if checksum_result.matches {
267 tracing::warn!(
268 " ⚠ {}.{}: Checksum matches but row count differs: source={}, target={}",
269 schema,
270 name,
271 checksum_result.source_row_count,
272 checksum_result.target_row_count
273 );
274 db_mismatches += 1;
275 } else {
276 tracing::error!(
277 " ✗ {}.{}: MISMATCH: source={} ({}), target={} ({})",
278 schema,
279 name,
280 &checksum_result.source_checksum[..8],
281 checksum_result.source_row_count,
282 &checksum_result.target_checksum[..8],
283 checksum_result.target_row_count
284 );
285 db_mismatches += 1;
286 }
287 }
288 Err(e) => {
289 let error_msg = format!("{}.{}: {}", schema, name, e);
290 tracing::error!(" ✗ ERROR: {}", error_msg);
291 db_mismatches += 1;
292 }
293 }
294 }
295
296 tracing::info!("");
298 tracing::info!("Database '{}' Summary:", db.name);
299 tracing::info!(" Total tables: {}", tables.len());
300 tracing::info!(" ✓ Matches: {}", db_matches);
301 tracing::info!(" ✗ Mismatches: {}", db_mismatches);
302 tracing::info!("");
303
304 total_tables += tables.len();
306 total_matches += db_matches;
307 total_mismatches += db_mismatches;
308 }
309
310 tracing::info!("========================================");
312 tracing::info!("Overall Verification Summary");
313 tracing::info!("========================================");
314 tracing::info!("Databases verified: {}", databases.len());
315 tracing::info!("Total tables: {}", total_tables);
316 tracing::info!("✓ Matches: {}", total_matches);
317 tracing::info!("✗ Mismatches: {}", total_mismatches);
318 tracing::info!("========================================");
319 tracing::info!("");
320
321 if total_mismatches > 0 {
322 tracing::error!("⚠ DATA INTEGRITY ISSUES DETECTED!");
323 tracing::error!(" {} table(s) have mismatched data", total_mismatches);
324 tracing::error!(" Review the logs above for details");
325 tracing::info!("");
326 tracing::info!("Possible causes:");
327 tracing::info!(" - Replication is still catching up (check 'status' command)");
328 tracing::info!(" - Data was modified on target after migration");
329 tracing::info!(" - Migration errors occurred during 'init' or 'sync'");
330 tracing::info!("");
331
332 anyhow::bail!("{} table(s) failed verification", total_mismatches);
333 } else {
334 tracing::info!("✓ ALL TABLES VERIFIED SUCCESSFULLY!");
335 tracing::info!(
336 " All {} tables match between source and target",
337 total_matches
338 );
339 tracing::info!(" Your migration data is intact and ready for cutover");
340 }
341
342 Ok(())
343}
344
345fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
356 let parts: Vec<&str> = url.splitn(2, '?').collect();
358 let base_url = parts[0];
359 let query_params = parts.get(1);
360
361 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
363
364 if url_parts.len() != 2 {
365 anyhow::bail!("Invalid connection URL format: cannot replace database name");
366 }
367
368 let new_url = if let Some(params) = query_params {
370 format!("{}/{}?{}", url_parts[1], new_db_name, params)
371 } else {
372 format!("{}/{}", url_parts[1], new_db_name)
373 };
374
375 Ok(new_url)
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[tokio::test]
383 #[ignore]
384 async fn test_verify_command() {
385 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
387 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
388
389 let result = verify(&source_url, &target_url, None).await;
390
391 match &result {
392 Ok(_) => {
393 println!("✓ Verify command completed successfully");
394 }
395 Err(e) => {
396 println!("Verify command result: {:?}", e);
397 }
400 }
401
402 }
405
406 #[test]
407 fn test_replace_database_in_url() {
408 let url = "postgresql://user:pass@localhost:5432/olddb";
410 let new_url = replace_database_in_url(url, "newdb").unwrap();
411 assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
412
413 let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
415 let new_url = replace_database_in_url(url, "newdb").unwrap();
416 assert_eq!(
417 new_url,
418 "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
419 );
420
421 let url = "postgresql://user:pass@localhost/olddb";
423 let new_url = replace_database_in_url(url, "newdb").unwrap();
424 assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
425 }
426
427 #[tokio::test]
428 #[ignore]
429 async fn test_verify_with_database_filter() {
430 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
431 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
432
433 let filter = crate::filters::ReplicationFilter::new(
435 Some(vec!["postgres".to_string()]),
436 None,
437 None,
438 None,
439 )
440 .expect("Failed to create filter");
441
442 let result = verify(&source_url, &target_url, Some(filter)).await;
443
444 match &result {
445 Ok(_) => println!("✓ Verify with database filter completed successfully"),
446 Err(e) => {
447 println!("Verify with database filter result: {:?}", e);
448 }
450 }
451
452 }
454
455 #[tokio::test]
456 #[ignore]
457 async fn test_verify_with_no_matching_databases() {
458 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
459 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
460
461 let filter = crate::filters::ReplicationFilter::new(
463 Some(vec!["nonexistent_database".to_string()]),
464 None,
465 None,
466 None,
467 )
468 .expect("Failed to create filter");
469
470 let result = verify(&source_url, &target_url, Some(filter)).await;
471
472 assert!(result.is_ok(), "Verify should succeed even with no matches");
474 }
475}