database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Check if user specified a target database name that doesn't match source databases
344    // This is important because init preserves source database names during replication
345    if let Ok(target_parts) = crate::utils::parse_postgres_url(target_url) {
346        let target_db = &target_parts.database;
347        let source_db_names: Vec<&str> = databases.iter().map(|db| db.name.as_str()).collect();
348
349        if !source_db_names.contains(&target_db.as_str())
350            && target_db != "postgres"
351            && !target_db.is_empty()
352        {
353            println!();
354            println!("========================================");
355            println!("⚠️  IMPORTANT: Target database name ignored");
356            println!("========================================");
357            println!();
358            println!(
359                "You specified target database '{}', but replication preserves",
360                target_db
361            );
362            println!("source database names. Data will be replicated to:");
363            println!();
364            for db_name in &source_db_names {
365                println!("  → {}", db_name);
366            }
367            println!();
368            println!(
369                "The '{}' database in your target URL is used only for",
370                target_db
371            );
372            println!("the initial connection. Source databases will be created");
373            println!("on the target server with their original names.");
374            println!();
375            println!("When running 'sync' later, use the correct database name:");
376            if let Some(first_db) = source_db_names.first() {
377                println!("  --target \"postgresql://.../{}\"\n", first_db);
378            }
379            println!("========================================");
380            println!();
381        }
382    }
383
384    // Estimate database sizes and get confirmation
385    if !skip_confirmation {
386        tracing::info!("Analyzing database sizes...");
387        let size_estimates = {
388            // Scope the connection so it's dropped after size estimation
389            let source_client = postgres::connect_with_retry(source_url).await?;
390            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
391                .await?
392        }; // Connection dropped here
393
394        if !confirm_replication(&size_estimates)? {
395            bail!("Replication cancelled by user");
396        }
397    }
398
399    // Step 4: Replicate each database
400    tracing::info!("Step 4/4: Replicating databases...");
401    for (idx, db_info) in databases.iter().enumerate() {
402        let filtered_tables = filter.predicate_tables(&db_info.name);
403        if checkpoint_state.is_completed(&db_info.name) {
404            tracing::info!(
405                "Skipping database '{}' (already completed per checkpoint)",
406                db_info.name
407            );
408            continue;
409        }
410        tracing::info!(
411            "Replicating database {}/{}: '{}'",
412            idx + 1,
413            databases.len(),
414            db_info.name
415        );
416
417        // Build connection URLs for this specific database
418        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
419        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
420
421        // Handle database creation atomically to avoid TOCTOU race condition
422        // Scope the connection so it's dropped before dump/restore subprocess operations
423        {
424            let target_client = postgres::connect_with_retry(target_url).await?;
425
426            // Validate database name to prevent SQL injection
427            crate::utils::validate_postgres_identifier(&db_info.name)
428                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
429
430            // Try to create database atomically (avoids TOCTOU vulnerability)
431            let create_query = format!(
432                "CREATE DATABASE {}",
433                crate::utils::quote_ident(&db_info.name)
434            );
435            match target_client.execute(&create_query, &[]).await {
436                Ok(_) => {
437                    tracing::info!("  Created database '{}'", db_info.name);
438                }
439                Err(err) => {
440                    // Check if error is "database already exists" (error code 42P04)
441                    if let Some(db_error) = err.as_db_error() {
442                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
443                            // Database already exists - handle based on user preferences
444                            tracing::info!(
445                                "  Database '{}' already exists on target",
446                                db_info.name
447                            );
448
449                            // Check if empty by connecting to the SPECIFIC database
450                            // (target_client is connected to default db, not the target db)
451                            let is_empty = {
452                                let db_client =
453                                    postgres::connect_with_retry(&target_db_url).await?;
454                                database_is_empty(&db_client).await?
455                            }; // db_client dropped here
456
457                            if is_empty {
458                                tracing::info!(
459                                    "  Database '{}' is empty, proceeding with restore",
460                                    db_info.name
461                                );
462                            } else {
463                                // Database exists and has data
464                                let should_drop = if drop_existing {
465                                    // Force drop with --drop-existing flag
466                                    true
467                                } else if skip_confirmation {
468                                    // Auto-confirm drop with --yes flag (non-interactive)
469                                    tracing::info!(
470                                        "  Auto-confirming drop for database '{}' (--yes flag)",
471                                        db_info.name
472                                    );
473                                    true
474                                } else {
475                                    // Interactive mode: prompt user
476                                    prompt_drop_database(&db_info.name)?
477                                };
478
479                                if should_drop {
480                                    drop_database_if_exists(&target_client, &db_info.name).await?;
481
482                                    // Recreate the database
483                                    let create_query = format!(
484                                        "CREATE DATABASE {}",
485                                        crate::utils::quote_ident(&db_info.name)
486                                    );
487                                    target_client
488                                        .execute(&create_query, &[])
489                                        .await
490                                        .with_context(|| {
491                                            format!(
492                                                "Failed to create database '{}' after drop",
493                                                db_info.name
494                                            )
495                                        })?;
496                                    tracing::info!("  Created database '{}'", db_info.name);
497                                } else {
498                                    bail!("Aborted: Database '{}' already exists", db_info.name);
499                                }
500                            }
501                        } else {
502                            // Some other database error - propagate it
503                            return Err(err).with_context(|| {
504                                format!("Failed to create database '{}'", db_info.name)
505                            });
506                        }
507                    } else {
508                        // Not a database error - propagate it
509                        return Err(err).with_context(|| {
510                            format!("Failed to create database '{}'", db_info.name)
511                        });
512                    }
513                }
514            }
515        } // Connection dropped here before dump/restore operations
516
517        // Dump and restore schema
518        tracing::info!("  Dumping schema for '{}'...", db_info.name);
519        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
520        migration::dump_schema(
521            &source_db_url,
522            &db_info.name,
523            schema_file.to_str().unwrap(),
524            &filter,
525        )
526        .await?;
527
528        tracing::info!("  Restoring schema for '{}'...", db_info.name);
529        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
530
531        // Dump and restore data (using directory format for parallel operations)
532        tracing::info!("  Dumping data for '{}'...", db_info.name);
533        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
534        migration::dump_data(
535            &source_db_url,
536            &db_info.name,
537            data_dir.to_str().unwrap(),
538            &filter,
539        )
540        .await?;
541
542        tracing::info!("  Restoring data for '{}'...", db_info.name);
543        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
544
545        if !filtered_tables.is_empty() {
546            tracing::info!(
547                "  Applying filtered replication for {} table(s)...",
548                filtered_tables.len()
549            );
550            migration::filtered::copy_filtered_tables(
551                &source_db_url,
552                &target_db_url,
553                &filtered_tables,
554            )
555            .await?;
556        }
557
558        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
559
560        checkpoint_state.mark_completed(&db_info.name);
561        checkpoint_state
562            .save(&checkpoint_path)
563            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
564    }
565
566    // Explicitly clean up temp directory
567    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
568    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
569        tracing::warn!("Failed to clean up temp directory: {}", e);
570        // Don't fail the entire operation if cleanup fails
571    }
572
573    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
574        tracing::warn!("Failed to remove checkpoint state: {}", err);
575    }
576
577    tracing::info!("✅ Initial replication complete");
578
579    // Check wal_level before attempting to set up sync
580    let mut should_enable_sync = enable_sync;
581    if enable_sync {
582        tracing::info!("Checking target wal_level for logical replication...");
583        let target_wal_level = {
584            // Scope the connection for quick wal_level check
585            let target_client = postgres::connect_with_retry(target_url).await?;
586            postgres::check_wal_level(&target_client).await?
587        }; // Connection dropped here
588
589        if target_wal_level != "logical" {
590            tracing::warn!("");
591            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
592            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
593            tracing::warn!("");
594            tracing::warn!("  To fix this:");
595            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
596            tracing::warn!("    2. Restart PostgreSQL server");
597            tracing::warn!(
598                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
599            );
600            tracing::warn!("");
601            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
602            should_enable_sync = false;
603        }
604    }
605
606    // Set up continuous logical replication if enabled
607    if should_enable_sync {
608        tracing::info!("");
609        tracing::info!("========================================");
610        tracing::info!("Step 5/5: Setting up continuous replication...");
611        tracing::info!("========================================");
612        tracing::info!("");
613
614        // Call sync command with the same filter
615        crate::commands::sync(
616            source_url,
617            target_url,
618            Some(filter),
619            None,
620            None,
621            None,
622            false,
623        )
624        .await
625        .context("Failed to set up continuous replication")?;
626
627        tracing::info!("");
628        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
629    } else {
630        tracing::info!("");
631        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
632        tracing::info!("  To enable it later, run:");
633        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
634    }
635
636    Ok(())
637}
638
639/// Replace the database name in a connection URL
640fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
641    // Parse URL to find database name
642    // Format: postgresql://user:pass@host:port/database?params
643
644    // Split by '?' to separate params
645    let parts: Vec<&str> = url.split('?').collect();
646    let base_url = parts[0];
647    let params = if parts.len() > 1 {
648        Some(parts[1])
649    } else {
650        None
651    };
652
653    // Split base by '/' to get everything before database name
654    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
655    if url_parts.len() != 2 {
656        anyhow::bail!("Invalid connection URL format");
657    }
658
659    // Reconstruct URL with new database name
660    let mut new_url = format!("{}/{}", url_parts[1], new_database);
661    if let Some(p) = params {
662        new_url = format!("{}?{}", new_url, p);
663    }
664
665    Ok(new_url)
666}
667
668/// Display database size estimates and prompt for confirmation
669///
670/// Shows a table with database names, sizes, and estimated replication times.
671/// Prompts the user to proceed with the replication.
672///
673/// # Arguments
674///
675/// * `sizes` - Vector of database size estimates
676///
677/// # Returns
678///
679/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
680///
681/// # Errors
682///
683/// Returns an error if stdin/stdout operations fail.
684fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
685    use std::time::Duration;
686
687    // Calculate totals
688    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
689    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
690
691    // Print table header
692    println!();
693    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
694    println!("{}", "─".repeat(50));
695
696    // Print each database
697    for size in sizes {
698        println!(
699            "{:<20} {:<12} {:<15}",
700            size.name,
701            size.size_human,
702            migration::format_duration(size.estimated_duration)
703        );
704    }
705
706    // Print totals
707    println!("{}", "─".repeat(50));
708    println!(
709        "Total: {} (estimated {})",
710        migration::format_bytes(total_bytes),
711        migration::format_duration(total_duration)
712    );
713    println!();
714
715    // Prompt for confirmation
716    print!("Proceed with replication? [y/N]: ");
717    io::stdout().flush()?;
718
719    let mut input = String::new();
720    io::stdin()
721        .read_line(&mut input)
722        .context("Failed to read user input")?;
723
724    Ok(input.trim().to_lowercase() == "y")
725}
726
727/// Checks if the currently connected database is empty (has no user tables).
728///
729/// Includes a 30-second timeout to prevent hanging on stale serverless connections.
730async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
731    let query = "
732        SELECT COUNT(*)
733        FROM information_schema.tables
734        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
735    ";
736
737    // Add timeout to prevent indefinite hang on stale serverless connections
738    let row = tokio::time::timeout(
739        std::time::Duration::from_secs(30),
740        client.query_one(query, &[]),
741    )
742    .await
743    .context("database_is_empty query timed out after 30 seconds")?
744    .context("Failed to query information_schema.tables")?;
745
746    let count: i64 = row.get(0);
747
748    Ok(count == 0)
749}
750
751/// Prompts user to drop existing database
752fn prompt_drop_database(db_name: &str) -> Result<bool> {
753    use std::io::{self, Write};
754
755    print!(
756        "\nWarning: Database '{}' already exists on target and contains data.\n\
757         Drop and recreate database? This will delete all existing data. [y/N]: ",
758        db_name
759    );
760    io::stdout().flush()?;
761
762    let mut input = String::new();
763    io::stdin().read_line(&mut input)?;
764
765    Ok(input.trim().eq_ignore_ascii_case("y"))
766}
767
768/// Drops a database if it exists
769async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
770    // Validate database name to prevent SQL injection
771    crate::utils::validate_postgres_identifier(db_name)
772        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
773
774    tracing::info!("  Dropping existing database '{}'...", db_name);
775
776    // Terminate existing connections to the database
777    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
778    let terminate_query = "
779        SELECT pg_terminate_backend(sa.pid)
780        FROM pg_stat_activity sa
781        JOIN pg_roles r ON sa.usename = r.rolname
782        WHERE sa.datname = $1
783          AND sa.pid <> pg_backend_pid()
784          AND NOT r.rolsuper
785    ";
786    target_conn.execute(terminate_query, &[&db_name]).await?;
787
788    // Check if any connections remain (including SUPERUSER connections we couldn't terminate)
789    let remaining_query = "
790        SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
791        FROM pg_stat_activity sa
792        WHERE sa.datname = $1
793          AND sa.pid <> pg_backend_pid()
794    ";
795    let row = target_conn
796        .query_one(remaining_query, &[&db_name])
797        .await
798        .context("Failed to check remaining connections")?;
799    let remaining_count: i64 = row.get(0);
800    let remaining_users: Option<String> = row.get(1);
801
802    if remaining_count > 0 {
803        let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
804        bail!(
805            "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
806             These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
807             This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
808             processes maintain superuser connections.\n\n\
809             To resolve this:\n\
810             1. Wait a few minutes and retry (system connections may be temporary)\n\
811             2. Ask your database administrator to terminate the blocking sessions:\n\
812                SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
813             3. If using AWS RDS, check for RDS-managed connections in the RDS console",
814            db_name,
815            remaining_count,
816            users,
817            db_name
818        );
819    }
820
821    // Drop the database
822    let drop_query = format!(
823        "DROP DATABASE IF EXISTS {}",
824        crate::utils::quote_ident(db_name)
825    );
826    target_conn
827        .execute(&drop_query, &[])
828        .await
829        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
830
831    tracing::info!("  ✓ Database '{}' dropped", db_name);
832    Ok(())
833}
834
835/// Initial replication from SQLite to PostgreSQL
836///
837/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
838/// 1. Validates SQLite file exists and is readable
839/// 2. Validates PostgreSQL target connection
840/// 3. Lists all tables from SQLite database
841/// 4. For each table:
842///    - Converts rows to JSONB format
843///    - Creates JSONB table in PostgreSQL
844///    - Batch inserts all data
845///
846/// All SQLite data is stored as JSONB with metadata:
847/// - id: Original row ID (from ID column or row number)
848/// - data: Complete row as JSON object
849/// - _source_type: "sqlite"
850/// - _migrated_at: Timestamp of migration
851///
852/// # Arguments
853///
854/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
855/// * `target_url` - PostgreSQL connection string for target (Seren) database
856///
857/// # Returns
858///
859/// Returns `Ok(())` if migration completes successfully.
860///
861/// # Errors
862///
863/// This function will return an error if:
864/// - SQLite file doesn't exist or isn't readable
865/// - Cannot connect to target PostgreSQL database
866/// - Table conversion fails
867/// - Database creation or insert operations fail
868///
869/// # Examples
870///
871/// ```no_run
872/// # use anyhow::Result;
873/// # use database_replicator::commands::init::init_sqlite_to_postgres;
874/// # async fn example() -> Result<()> {
875/// init_sqlite_to_postgres(
876///     "database.db",
877///     "postgresql://user:pass@seren.example.com/targetdb"
878/// ).await?;
879/// # Ok(())
880/// # }
881/// ```
882pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
883    tracing::info!("Starting SQLite to PostgreSQL migration...");
884
885    // Step 1: Validate SQLite file
886    tracing::info!("Step 1/4: Validating SQLite database...");
887    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
888        .context("SQLite file validation failed")?;
889    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
890
891    // Step 2: Open SQLite connection (read-only)
892    tracing::info!("Step 2/4: Opening SQLite database...");
893    let sqlite_conn =
894        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
895    tracing::info!("  ✓ SQLite database opened (read-only mode)");
896
897    // Step 3: List all tables
898    tracing::info!("Step 3/4: Discovering tables...");
899    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
900        .context("Failed to list tables from SQLite database")?;
901
902    if tables.is_empty() {
903        tracing::warn!("⚠ No tables found in SQLite database");
904        tracing::info!("✅ Migration complete (no tables to migrate)");
905        return Ok(());
906    }
907
908    tracing::info!("Found {} table(s) to migrate", tables.len());
909
910    // Connect to PostgreSQL target
911    let target_client = postgres::connect_with_retry(target_url).await?;
912    tracing::info!("  ✓ Connected to PostgreSQL target");
913
914    // Step 4: Migrate each table
915    tracing::info!("Step 4/4: Migrating tables...");
916    for (idx, table_name) in tables.iter().enumerate() {
917        tracing::info!(
918            "Migrating table {}/{}: '{}'",
919            idx + 1,
920            tables.len(),
921            table_name
922        );
923
924        // Convert SQLite table to JSONB
925        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
926            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
927
928        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
929
930        // Create JSONB table in PostgreSQL
931        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
932            .await
933            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
934
935        // Truncate existing data to make init idempotent (fixes #69)
936        crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
937            .await
938            .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
939
940        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
941
942        if !rows.is_empty() {
943            // Batch insert all rows
944            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
945                .await
946                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
947
948            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
949        } else {
950            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
951        }
952    }
953
954    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
955    tracing::info!(
956        "   Migrated {} table(s) from '{}' to PostgreSQL",
957        tables.len(),
958        sqlite_path
959    );
960
961    Ok(())
962}
963
964/// Initial replication from MongoDB to PostgreSQL
965///
966/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
967/// 1. Validates MongoDB connection string
968/// 2. Connects to MongoDB and verifies connection
969/// 3. Extracts database name from connection string
970/// 4. Lists all collections from MongoDB database
971/// 5. For each collection:
972///    - Converts documents to JSONB format
973///    - Creates JSONB table in PostgreSQL
974///    - Batch inserts all data
975///
976/// All MongoDB data is stored as JSONB with metadata:
977/// - id: Original _id field (ObjectId → hex, String/Int → string)
978/// - data: Complete document as JSON object
979/// - _source_type: "mongodb"
980/// - _migrated_at: Timestamp of migration
981///
982/// # Arguments
983///
984/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
985/// * `target_url` - PostgreSQL connection string for target (Seren) database
986///
987/// # Returns
988///
989/// Returns `Ok(())` if migration completes successfully.
990///
991/// # Errors
992///
993/// This function will return an error if:
994/// - MongoDB connection string is invalid
995/// - Cannot connect to MongoDB database
996/// - Database name is not specified in connection string
997/// - Cannot connect to target PostgreSQL database
998/// - Collection conversion fails
999/// - Database creation or insert operations fail
1000///
1001/// # Examples
1002///
1003/// ```no_run
1004/// # use anyhow::Result;
1005/// # use database_replicator::commands::init::init_mongodb_to_postgres;
1006/// # async fn example() -> Result<()> {
1007/// init_mongodb_to_postgres(
1008///     "mongodb://localhost:27017/mydb",
1009///     "postgresql://user:pass@seren.example.com/targetdb"
1010/// ).await?;
1011/// # Ok(())
1012/// # }
1013/// ```
1014pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
1015    tracing::info!("Starting MongoDB to PostgreSQL migration...");
1016
1017    // Step 1: Validate and connect to MongoDB
1018    tracing::info!("Step 1/5: Validating MongoDB connection...");
1019    let client = crate::mongodb::connect_mongodb(mongo_url)
1020        .await
1021        .context("MongoDB connection failed")?;
1022    tracing::info!("  ✓ MongoDB connection validated");
1023
1024    // Step 2: Extract database name
1025    tracing::info!("Step 2/5: Extracting database name...");
1026    let db_name = crate::mongodb::extract_database_name(mongo_url)
1027        .await
1028        .context("Failed to parse MongoDB connection string")?
1029        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
1030    tracing::info!("  ✓ Database name: '{}'", db_name);
1031
1032    // Step 3: List all collections
1033    tracing::info!("Step 3/5: Discovering collections...");
1034    let db = client.database(&db_name);
1035    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
1036        .await
1037        .context("Failed to list collections from MongoDB database")?;
1038
1039    if collections.is_empty() {
1040        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
1041        tracing::info!("✅ Migration complete (no collections to migrate)");
1042        return Ok(());
1043    }
1044
1045    tracing::info!("Found {} collection(s) to migrate", collections.len());
1046
1047    // Step 4: Connect to PostgreSQL target
1048    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1049    let target_client = postgres::connect_with_retry(target_url).await?;
1050    tracing::info!("  ✓ Connected to PostgreSQL target");
1051
1052    // Step 5: Migrate each collection
1053    tracing::info!("Step 5/5: Migrating collections...");
1054    for (idx, collection_name) in collections.iter().enumerate() {
1055        tracing::info!(
1056            "Migrating collection {}/{}: '{}'",
1057            idx + 1,
1058            collections.len(),
1059            collection_name
1060        );
1061
1062        // Convert MongoDB collection to JSONB
1063        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1064            .await
1065            .with_context(|| {
1066                format!(
1067                    "Failed to convert collection '{}' to JSONB",
1068                    collection_name
1069                )
1070            })?;
1071
1072        tracing::info!(
1073            "  ✓ Converted {} documents from '{}'",
1074            rows.len(),
1075            collection_name
1076        );
1077
1078        // Create JSONB table in PostgreSQL
1079        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1080            .await
1081            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1082
1083        // Truncate existing data to make init idempotent (fixes #69)
1084        crate::jsonb::writer::truncate_jsonb_table(&target_client, collection_name)
1085            .await
1086            .with_context(|| format!("Failed to truncate JSONB table '{}'", collection_name))?;
1087
1088        tracing::info!(
1089            "  ✓ Created JSONB table '{}' in PostgreSQL",
1090            collection_name
1091        );
1092
1093        if !rows.is_empty() {
1094            // Batch insert all rows
1095            crate::jsonb::writer::insert_jsonb_batch(
1096                &target_client,
1097                collection_name,
1098                rows,
1099                "mongodb",
1100            )
1101            .await
1102            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1103
1104            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1105        } else {
1106            tracing::info!(
1107                "  ✓ Collection '{}' is empty (no documents to insert)",
1108                collection_name
1109            );
1110        }
1111    }
1112
1113    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1114    tracing::info!(
1115        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1116        collections.len(),
1117        db_name
1118    );
1119
1120    Ok(())
1121}
1122
1123/// Initial replication from MySQL to PostgreSQL
1124///
1125/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1126/// 1. Validates MySQL connection string
1127/// 2. Connects to MySQL and verifies connection
1128/// 3. Extracts database name from connection string
1129/// 4. Lists all tables from MySQL database
1130/// 5. For each table:
1131///    - Converts rows to JSONB format
1132///    - Creates JSONB table in PostgreSQL
1133///    - Batch inserts all data
1134///
1135/// All MySQL data is stored as JSONB with metadata:
1136/// - id: Primary key or auto-generated ID
1137/// - data: Complete row as JSON object
1138/// - _source_type: "mysql"
1139/// - _migrated_at: Timestamp of replication
1140///
1141/// # Arguments
1142///
1143/// * `mysql_url` - MySQL connection string (mysql://...)
1144/// * `target_url` - PostgreSQL connection string for target (Seren) database
1145///
1146/// # Returns
1147///
1148/// Returns `Ok(())` if replication completes successfully.
1149///
1150/// # Errors
1151///
1152/// This function will return an error if:
1153/// - MySQL connection string is invalid
1154/// - Cannot connect to MySQL database
1155/// - Database name is not specified in connection string
1156/// - Cannot connect to target PostgreSQL database
1157/// - Table conversion fails
1158/// - Database creation or insert operations fail
1159///
1160/// # Examples
1161///
1162/// ```no_run
1163/// # use anyhow::Result;
1164/// # use database_replicator::commands::init::init_mysql_to_postgres;
1165/// # async fn example() -> Result<()> {
1166/// init_mysql_to_postgres(
1167///     "mysql://user:pass@localhost:3306/mydb",
1168///     "postgresql://user:pass@seren.example.com/targetdb"
1169/// ).await?;
1170/// # Ok(())
1171/// # }
1172/// ```
1173pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1174    tracing::info!("Starting MySQL to PostgreSQL replication...");
1175
1176    // Step 1: Validate and connect to MySQL
1177    tracing::info!("Step 1/5: Validating MySQL connection...");
1178    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1179        .await
1180        .context("MySQL connection failed")?;
1181    tracing::info!("  ✓ MySQL connection validated");
1182
1183    // Step 2: Extract database name
1184    tracing::info!("Step 2/5: Extracting database name...");
1185    let db_name = crate::mysql::extract_database_name(mysql_url)
1186        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1187    tracing::info!("  ✓ Database name: '{}'", db_name);
1188
1189    // Step 3: List all tables
1190    tracing::info!("Step 3/5: Discovering tables...");
1191    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1192        .await
1193        .context("Failed to list tables from MySQL database")?;
1194
1195    if tables.is_empty() {
1196        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1197        tracing::info!("✅ Replication complete (no tables to replicate)");
1198        return Ok(());
1199    }
1200
1201    tracing::info!("Found {} table(s) to replicate", tables.len());
1202
1203    // Step 4: Connect to PostgreSQL target
1204    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1205    let target_client = postgres::connect_with_retry(target_url).await?;
1206    tracing::info!("  ✓ Connected to PostgreSQL target");
1207
1208    // Step 5: Replicate each table
1209    tracing::info!("Step 5/5: Replicating tables...");
1210    for (idx, table_name) in tables.iter().enumerate() {
1211        tracing::info!(
1212            "Replicating table {}/{}: '{}'",
1213            idx + 1,
1214            tables.len(),
1215            table_name
1216        );
1217
1218        // Convert MySQL table to JSONB
1219        let rows =
1220            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1221                .await
1222                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1223
1224        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1225
1226        // Create JSONB table in PostgreSQL
1227        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1228            .await
1229            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1230
1231        // Truncate existing data to make init idempotent (fixes #69)
1232        crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
1233            .await
1234            .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
1235
1236        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1237
1238        if !rows.is_empty() {
1239            // Batch insert all rows
1240            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1241                .await
1242                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1243
1244            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1245        } else {
1246            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1247        }
1248    }
1249
1250    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1251    tracing::info!(
1252        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1253        tables.len(),
1254        db_name
1255    );
1256
1257    Ok(())
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263
1264    #[tokio::test]
1265    #[ignore]
1266    async fn test_init_replicates_database() {
1267        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1268        let target = std::env::var("TEST_TARGET_URL").unwrap();
1269
1270        // Skip confirmation for automated tests, disable sync to keep test simple
1271        let filter = crate::filters::ReplicationFilter::empty();
1272        let result = init(&source, &target, true, filter, false, false, true, false).await;
1273        assert!(result.is_ok());
1274    }
1275
1276    #[test]
1277    fn test_replace_database_in_url() {
1278        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1279        let result = replace_database_in_url(url, "newdb").unwrap();
1280        assert_eq!(
1281            result,
1282            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1283        );
1284
1285        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1286        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1287        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1288    }
1289
1290    #[tokio::test]
1291    #[ignore]
1292    async fn test_database_is_empty() {
1293        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1294
1295        // Connect to the database first
1296        let client = crate::postgres::connect_with_retry(&url)
1297            .await
1298            .expect("Failed to connect");
1299
1300        // postgres database might be empty of user tables
1301        // This test just verifies the function doesn't crash
1302        let result = database_is_empty(&client).await;
1303        assert!(result.is_ok());
1304    }
1305}