database_replicator/commands/
verify.rs

1// ABOUTME: Verify command implementation - Validate data integrity
2// ABOUTME: Compares table checksums between source and target databases
3
4use crate::migration::{self, compare_tables, list_tables};
5use crate::postgres::connect;
6use anyhow::{Context, Result};
7use futures::stream::{self, StreamExt};
8use indicatif::{ProgressBar, ProgressStyle};
9
10/// Verify data integrity between source and target databases
11///
12/// This command performs Phase 5 of the migration process:
13/// 1. Discovers databases and filters them based on criteria
14/// 2. For each filtered database:
15///    - Lists all tables and filters them
16///    - Compares each table's checksum between source and target
17///    - Reports any mismatches or missing tables
18/// 3. Provides overall validation summary across all databases
19///
20/// Uses parallel verification (up to 4 concurrent table checks) with progress bars
21/// for efficient processing of large databases.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `filter` - Optional replication filter for database and table selection
28///
29/// # Returns
30///
31/// Returns `Ok(())` if all tables match or after displaying verification results.
32///
33/// # Errors
34///
35/// This function will return an error if:
36/// - Cannot connect to source or target database
37/// - Cannot discover databases on source
38/// - Cannot list tables from source
39/// - Table comparison fails due to connection issues
40///
41/// # Examples
42///
43/// ```no_run
44/// # use anyhow::Result;
45/// # use database_replicator::commands::verify;
46/// # use database_replicator::filters::ReplicationFilter;
47/// # async fn example() -> Result<()> {
48/// // Verify all databases
49/// verify(
50///     "postgresql://user:pass@source.example.com/postgres",
51///     "postgresql://user:pass@target.example.com/postgres",
52///     None
53/// ).await?;
54///
55/// // Verify only specific databases
56/// let filter = ReplicationFilter::new(
57///     Some(vec!["mydb".to_string(), "analytics".to_string()]),
58///     None,
59///     None,
60///     None,
61/// )?;
62/// verify(
63///     "postgresql://user:pass@source.example.com/postgres",
64///     "postgresql://user:pass@target.example.com/postgres",
65///     Some(filter)
66/// ).await?;
67/// # Ok(())
68/// # }
69/// ```
70pub async fn verify(
71    source_url: &str,
72    target_url: &str,
73    filter: Option<crate::filters::ReplicationFilter>,
74) -> Result<()> {
75    let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
76
77    tracing::info!("Starting data integrity verification...");
78    tracing::info!("");
79
80    // Ensure source and target are different
81    crate::utils::validate_source_target_different(source_url, target_url)
82        .context("Source and target validation failed")?;
83    tracing::info!("✓ Verified source and target are different databases");
84    tracing::info!("");
85
86    // Connect to source database to discover databases
87    tracing::info!("Connecting to source database...");
88    let source_client = connect(source_url)
89        .await
90        .context("Failed to connect to source database")?;
91
92    // Discover and filter databases
93    tracing::info!("Discovering databases on source...");
94    let all_databases = migration::list_databases(&source_client)
95        .await
96        .context("Failed to list databases on source")?;
97
98    // Apply filtering rules
99    let databases: Vec<_> = all_databases
100        .into_iter()
101        .filter(|db| filter.should_replicate_database(&db.name))
102        .collect();
103
104    if databases.is_empty() {
105        tracing::warn!("⚠ No databases matched the filter criteria");
106        tracing::warn!("  No verification to perform");
107        return Ok(());
108    }
109
110    tracing::info!("Found {} database(s) to verify:", databases.len());
111    for db in &databases {
112        tracing::info!("  - {}", db.name);
113    }
114    tracing::info!("");
115
116    // Overall statistics across all databases
117    let mut total_matches = 0;
118    let mut total_mismatches = 0;
119    let mut total_tables = 0;
120
121    // Verify each database
122    for db in &databases {
123        tracing::info!("========================================");
124        tracing::info!("Database: '{}'", db.name);
125        tracing::info!("========================================");
126
127        // Build database-specific connection URLs
128        let source_db_url = replace_database_in_url(source_url, &db.name).context(format!(
129            "Failed to build source URL for database '{}'",
130            db.name
131        ))?;
132        let target_db_url = replace_database_in_url(target_url, &db.name).context(format!(
133            "Failed to build target URL for database '{}'",
134            db.name
135        ))?;
136
137        // Connect to the specific database on source and target
138        tracing::info!("Connecting to database '{}'...", db.name);
139        let source_db_client = connect(&source_db_url).await.context(format!(
140            "Failed to connect to source database '{}'",
141            db.name
142        ))?;
143        let target_db_client = connect(&target_db_url).await.context(format!(
144            "Failed to connect to target database '{}'",
145            db.name
146        ))?;
147
148        // List tables from source
149        tracing::info!("Discovering tables...");
150        let all_tables = list_tables(&source_db_client)
151            .await
152            .context(format!("Failed to list tables from database '{}'", db.name))?;
153
154        // Filter tables based on filter rules
155        let tables: Vec<_> = all_tables
156            .into_iter()
157            .filter(|table| {
158                // Build full table name in "database.table" format for filtering
159                let table_name = if table.schema == "public" {
160                    table.name.clone()
161                } else {
162                    format!("{}.{}", table.schema, table.name)
163                };
164                filter.should_replicate_table(&db.name, &table_name)
165            })
166            .collect();
167
168        if tables.is_empty() {
169            tracing::warn!("⚠ No tables found to verify in database '{}'", db.name);
170            tracing::info!("");
171            continue;
172        }
173
174        tracing::info!("Found {} tables to verify", tables.len());
175        tracing::info!("Using parallel verification (concurrency: 4)");
176        tracing::info!("");
177
178        // Create progress bar
179        let progress = ProgressBar::new(tables.len() as u64);
180        progress.set_style(
181            ProgressStyle::default_bar()
182                .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}")
183                .unwrap()
184                .progress_chars("##-"),
185        );
186
187        // Create additional connections for parallel processing (3 more pairs for concurrency=4)
188        let source_db_client2 = connect(&source_db_url).await.context(format!(
189            "Failed to create additional source connection 2 for database '{}'",
190            db.name
191        ))?;
192        let target_db_client2 = connect(&target_db_url).await.context(format!(
193            "Failed to create additional target connection 2 for database '{}'",
194            db.name
195        ))?;
196        let source_db_client3 = connect(&source_db_url).await.context(format!(
197            "Failed to create additional source connection 3 for database '{}'",
198            db.name
199        ))?;
200        let target_db_client3 = connect(&target_db_url).await.context(format!(
201            "Failed to create additional target connection 3 for database '{}'",
202            db.name
203        ))?;
204        let source_db_client4 = connect(&source_db_url).await.context(format!(
205            "Failed to create additional source connection 4 for database '{}'",
206            db.name
207        ))?;
208        let target_db_client4 = connect(&target_db_url).await.context(format!(
209            "Failed to create additional target connection 4 for database '{}'",
210            db.name
211        ))?;
212
213        // Store clients in an array for round-robin access (4 connections to match concurrency=4)
214        let source_clients = [
215            source_db_client,
216            source_db_client2,
217            source_db_client3,
218            source_db_client4,
219        ];
220        let target_clients = [
221            target_db_client,
222            target_db_client2,
223            target_db_client3,
224            target_db_client4,
225        ];
226
227        // Process tables in parallel with limited concurrency
228        let verification_results: Vec<_> = stream::iter(tables.iter().enumerate())
229            .map(|(idx, table)| {
230                let schema = table.schema.clone();
231                let name = table.name.clone();
232                let source_client = &source_clients[idx % source_clients.len()];
233                let target_client = &target_clients[idx % target_clients.len()];
234                let pb = progress.clone();
235
236                async move {
237                    let result = compare_tables(source_client, target_client, &schema, &name).await;
238                    pb.inc(1);
239                    pb.set_message(format!("Verified {}.{}", schema, name));
240                    (schema, name, result)
241                }
242            })
243            .buffer_unordered(4) // Process up to 4 tables concurrently
244            .collect()
245            .await;
246
247        progress.finish_with_message(format!("Verification complete for database '{}'", db.name));
248        tracing::info!("");
249
250        // Process results for this database
251        let mut db_mismatches = 0;
252        let mut db_matches = 0;
253
254        for (schema, name, result) in verification_results {
255            match result {
256                Ok(checksum_result) => {
257                    if checksum_result.is_valid() {
258                        tracing::info!(
259                            "  ✓ {}.{}: Match ({} rows, checksum: {})",
260                            schema,
261                            name,
262                            checksum_result.source_row_count,
263                            &checksum_result.source_checksum[..8]
264                        );
265                        db_matches += 1;
266                    } else if checksum_result.matches {
267                        tracing::warn!(
268                            "  ⚠ {}.{}: Checksum matches but row count differs: source={}, target={}",
269                            schema,
270                            name,
271                            checksum_result.source_row_count,
272                            checksum_result.target_row_count
273                        );
274                        db_mismatches += 1;
275                    } else {
276                        tracing::error!(
277                            "  ✗ {}.{}: MISMATCH: source={} ({}), target={} ({})",
278                            schema,
279                            name,
280                            &checksum_result.source_checksum[..8],
281                            checksum_result.source_row_count,
282                            &checksum_result.target_checksum[..8],
283                            checksum_result.target_row_count
284                        );
285                        db_mismatches += 1;
286                    }
287                }
288                Err(e) => {
289                    let error_msg = format!("{}.{}: {}", schema, name, e);
290                    tracing::error!("  ✗ ERROR: {}", error_msg);
291                    db_mismatches += 1;
292                }
293            }
294        }
295
296        // Display summary for this database
297        tracing::info!("");
298        tracing::info!("Database '{}' Summary:", db.name);
299        tracing::info!("  Total tables: {}", tables.len());
300        tracing::info!("  ✓ Matches: {}", db_matches);
301        tracing::info!("  ✗ Mismatches: {}", db_mismatches);
302        tracing::info!("");
303
304        // Update overall statistics
305        total_tables += tables.len();
306        total_matches += db_matches;
307        total_mismatches += db_mismatches;
308    }
309
310    // Display overall summary
311    tracing::info!("========================================");
312    tracing::info!("Overall Verification Summary");
313    tracing::info!("========================================");
314    tracing::info!("Databases verified: {}", databases.len());
315    tracing::info!("Total tables: {}", total_tables);
316    tracing::info!("✓ Matches: {}", total_matches);
317    tracing::info!("✗ Mismatches: {}", total_mismatches);
318    tracing::info!("========================================");
319    tracing::info!("");
320
321    if total_mismatches > 0 {
322        tracing::error!("⚠ DATA INTEGRITY ISSUES DETECTED!");
323        tracing::error!("  {} table(s) have mismatched data", total_mismatches);
324        tracing::error!("  Review the logs above for details");
325        tracing::info!("");
326        tracing::info!("Possible causes:");
327        tracing::info!("  - Replication is still catching up (check 'status' command)");
328        tracing::info!("  - Data was modified on target after migration");
329        tracing::info!("  - Migration errors occurred during 'init' or 'sync'");
330        tracing::info!("");
331
332        anyhow::bail!("{} table(s) failed verification", total_mismatches);
333    } else {
334        tracing::info!("✓ ALL TABLES VERIFIED SUCCESSFULLY!");
335        tracing::info!(
336            "  All {} tables match between source and target",
337            total_matches
338        );
339        tracing::info!("  Your migration data is intact and ready for cutover");
340    }
341
342    Ok(())
343}
344
345/// Replace the database name in a PostgreSQL connection URL
346///
347/// # Arguments
348///
349/// * `url` - PostgreSQL connection URL
350/// * `new_db_name` - New database name to use
351///
352/// # Returns
353///
354/// URL with the database name replaced
355fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
356    // Split into base URL and query parameters
357    let parts: Vec<&str> = url.splitn(2, '?').collect();
358    let base_url = parts[0];
359    let query_params = parts.get(1);
360
361    // Split base URL by '/' to replace the database name
362    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
363
364    if url_parts.len() != 2 {
365        anyhow::bail!("Invalid connection URL format: cannot replace database name");
366    }
367
368    // Rebuild URL with new database name
369    let new_url = if let Some(params) = query_params {
370        format!("{}/{}?{}", url_parts[1], new_db_name, params)
371    } else {
372        format!("{}/{}", url_parts[1], new_db_name)
373    };
374
375    Ok(new_url)
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[tokio::test]
383    #[ignore]
384    async fn test_verify_command() {
385        // This test requires both source and target databases
386        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
387        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
388
389        let result = verify(&source_url, &target_url, None).await;
390
391        match &result {
392            Ok(_) => {
393                println!("✓ Verify command completed successfully");
394            }
395            Err(e) => {
396                println!("Verify command result: {:?}", e);
397                // It's okay if tables don't match yet (replication not set up)
398                // We're just testing that the command runs
399            }
400        }
401
402        // The command should at least connect and attempt verification
403        // Even if it finds mismatches, that's a valid result
404    }
405
406    #[test]
407    fn test_replace_database_in_url() {
408        // Basic URL
409        let url = "postgresql://user:pass@localhost:5432/olddb";
410        let new_url = replace_database_in_url(url, "newdb").unwrap();
411        assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
412
413        // URL with query parameters
414        let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
415        let new_url = replace_database_in_url(url, "newdb").unwrap();
416        assert_eq!(
417            new_url,
418            "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
419        );
420
421        // URL without port
422        let url = "postgresql://user:pass@localhost/olddb";
423        let new_url = replace_database_in_url(url, "newdb").unwrap();
424        assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
425    }
426
427    #[tokio::test]
428    #[ignore]
429    async fn test_verify_with_database_filter() {
430        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
431        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
432
433        // Create filter that includes only postgres database
434        let filter = crate::filters::ReplicationFilter::new(
435            Some(vec!["postgres".to_string()]),
436            None,
437            None,
438            None,
439        )
440        .expect("Failed to create filter");
441
442        let result = verify(&source_url, &target_url, Some(filter)).await;
443
444        match &result {
445            Ok(_) => println!("✓ Verify with database filter completed successfully"),
446            Err(e) => {
447                println!("Verify with database filter result: {:?}", e);
448                // It's okay if tables don't match - we're testing filtering works
449            }
450        }
451
452        // Command should at least connect and discover databases
453    }
454
455    #[tokio::test]
456    #[ignore]
457    async fn test_verify_with_no_matching_databases() {
458        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
459        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
460
461        // Create filter that matches no databases
462        let filter = crate::filters::ReplicationFilter::new(
463            Some(vec!["nonexistent_database".to_string()]),
464            None,
465            None,
466            None,
467        )
468        .expect("Failed to create filter");
469
470        let result = verify(&source_url, &target_url, Some(filter)).await;
471
472        // Should succeed but show no verification (early return)
473        assert!(result.is_ok(), "Verify should succeed even with no matches");
474    }
475}