database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Estimate database sizes and get confirmation
344    if !skip_confirmation {
345        tracing::info!("Analyzing database sizes...");
346        let size_estimates = {
347            // Scope the connection so it's dropped after size estimation
348            let source_client = postgres::connect_with_retry(source_url).await?;
349            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350                .await?
351        }; // Connection dropped here
352
353        if !confirm_replication(&size_estimates)? {
354            bail!("Replication cancelled by user");
355        }
356    }
357
358    // Step 4: Replicate each database
359    tracing::info!("Step 4/4: Replicating databases...");
360    for (idx, db_info) in databases.iter().enumerate() {
361        let filtered_tables = filter.predicate_tables(&db_info.name);
362        if checkpoint_state.is_completed(&db_info.name) {
363            tracing::info!(
364                "Skipping database '{}' (already completed per checkpoint)",
365                db_info.name
366            );
367            continue;
368        }
369        tracing::info!(
370            "Replicating database {}/{}: '{}'",
371            idx + 1,
372            databases.len(),
373            db_info.name
374        );
375
376        // Build connection URLs for this specific database
377        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380        // Handle database creation atomically to avoid TOCTOU race condition
381        // Scope the connection so it's dropped before dump/restore subprocess operations
382        {
383            let target_client = postgres::connect_with_retry(target_url).await?;
384
385            // Validate database name to prevent SQL injection
386            crate::utils::validate_postgres_identifier(&db_info.name)
387                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389            // Try to create database atomically (avoids TOCTOU vulnerability)
390            let create_query = format!(
391                "CREATE DATABASE {}",
392                crate::utils::quote_ident(&db_info.name)
393            );
394            match target_client.execute(&create_query, &[]).await {
395                Ok(_) => {
396                    tracing::info!("  Created database '{}'", db_info.name);
397                }
398                Err(err) => {
399                    // Check if error is "database already exists" (error code 42P04)
400                    if let Some(db_error) = err.as_db_error() {
401                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402                            // Database already exists - handle based on user preferences
403                            tracing::info!(
404                                "  Database '{}' already exists on target",
405                                db_info.name
406                            );
407
408                            // Check if empty
409                            if database_is_empty(target_url, &db_info.name).await? {
410                                tracing::info!(
411                                    "  Database '{}' is empty, proceeding with restore",
412                                    db_info.name
413                                );
414                            } else {
415                                // Database exists and has data
416                                let should_drop = if drop_existing {
417                                    // Force drop with --drop-existing flag
418                                    true
419                                } else if skip_confirmation {
420                                    // Auto-confirm drop with --yes flag (non-interactive)
421                                    tracing::info!(
422                                        "  Auto-confirming drop for database '{}' (--yes flag)",
423                                        db_info.name
424                                    );
425                                    true
426                                } else {
427                                    // Interactive mode: prompt user
428                                    prompt_drop_database(&db_info.name)?
429                                };
430
431                                if should_drop {
432                                    drop_database_if_exists(&target_client, &db_info.name).await?;
433
434                                    // Recreate the database
435                                    let create_query = format!(
436                                        "CREATE DATABASE {}",
437                                        crate::utils::quote_ident(&db_info.name)
438                                    );
439                                    target_client
440                                        .execute(&create_query, &[])
441                                        .await
442                                        .with_context(|| {
443                                            format!(
444                                                "Failed to create database '{}' after drop",
445                                                db_info.name
446                                            )
447                                        })?;
448                                    tracing::info!("  Created database '{}'", db_info.name);
449                                } else {
450                                    bail!("Aborted: Database '{}' already exists", db_info.name);
451                                }
452                            }
453                        } else {
454                            // Some other database error - propagate it
455                            return Err(err).with_context(|| {
456                                format!("Failed to create database '{}'", db_info.name)
457                            });
458                        }
459                    } else {
460                        // Not a database error - propagate it
461                        return Err(err).with_context(|| {
462                            format!("Failed to create database '{}'", db_info.name)
463                        });
464                    }
465                }
466            }
467        } // Connection dropped here before dump/restore operations
468
469        // Dump and restore schema
470        tracing::info!("  Dumping schema for '{}'...", db_info.name);
471        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472        migration::dump_schema(
473            &source_db_url,
474            &db_info.name,
475            schema_file.to_str().unwrap(),
476            &filter,
477        )
478        .await?;
479
480        tracing::info!("  Restoring schema for '{}'...", db_info.name);
481        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483        // Dump and restore data (using directory format for parallel operations)
484        tracing::info!("  Dumping data for '{}'...", db_info.name);
485        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486        migration::dump_data(
487            &source_db_url,
488            &db_info.name,
489            data_dir.to_str().unwrap(),
490            &filter,
491        )
492        .await?;
493
494        tracing::info!("  Restoring data for '{}'...", db_info.name);
495        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497        if !filtered_tables.is_empty() {
498            tracing::info!(
499                "  Applying filtered replication for {} table(s)...",
500                filtered_tables.len()
501            );
502            migration::filtered::copy_filtered_tables(
503                &source_db_url,
504                &target_db_url,
505                &filtered_tables,
506            )
507            .await?;
508        }
509
510        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512        checkpoint_state.mark_completed(&db_info.name);
513        checkpoint_state
514            .save(&checkpoint_path)
515            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516    }
517
518    // Explicitly clean up temp directory
519    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
520    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521        tracing::warn!("Failed to clean up temp directory: {}", e);
522        // Don't fail the entire operation if cleanup fails
523    }
524
525    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526        tracing::warn!("Failed to remove checkpoint state: {}", err);
527    }
528
529    tracing::info!("✅ Initial replication complete");
530
531    // Check wal_level before attempting to set up sync
532    let mut should_enable_sync = enable_sync;
533    if enable_sync {
534        tracing::info!("Checking target wal_level for logical replication...");
535        let target_wal_level = {
536            // Scope the connection for quick wal_level check
537            let target_client = postgres::connect_with_retry(target_url).await?;
538            postgres::check_wal_level(&target_client).await?
539        }; // Connection dropped here
540
541        if target_wal_level != "logical" {
542            tracing::warn!("");
543            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
545            tracing::warn!("");
546            tracing::warn!("  To fix this:");
547            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
548            tracing::warn!("    2. Restart PostgreSQL server");
549            tracing::warn!(
550                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551            );
552            tracing::warn!("");
553            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554            should_enable_sync = false;
555        }
556    }
557
558    // Set up continuous logical replication if enabled
559    if should_enable_sync {
560        tracing::info!("");
561        tracing::info!("========================================");
562        tracing::info!("Step 5/5: Setting up continuous replication...");
563        tracing::info!("========================================");
564        tracing::info!("");
565
566        // Call sync command with the same filter
567        crate::commands::sync(
568            source_url,
569            target_url,
570            Some(filter),
571            None,
572            None,
573            None,
574            false,
575        )
576        .await
577        .context("Failed to set up continuous replication")?;
578
579        tracing::info!("");
580        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581    } else {
582        tracing::info!("");
583        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584        tracing::info!("  To enable it later, run:");
585        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
586    }
587
588    Ok(())
589}
590
591/// Replace the database name in a connection URL
592fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593    // Parse URL to find database name
594    // Format: postgresql://user:pass@host:port/database?params
595
596    // Split by '?' to separate params
597    let parts: Vec<&str> = url.split('?').collect();
598    let base_url = parts[0];
599    let params = if parts.len() > 1 {
600        Some(parts[1])
601    } else {
602        None
603    };
604
605    // Split base by '/' to get everything before database name
606    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607    if url_parts.len() != 2 {
608        anyhow::bail!("Invalid connection URL format");
609    }
610
611    // Reconstruct URL with new database name
612    let mut new_url = format!("{}/{}", url_parts[1], new_database);
613    if let Some(p) = params {
614        new_url = format!("{}?{}", new_url, p);
615    }
616
617    Ok(new_url)
618}
619
620/// Display database size estimates and prompt for confirmation
621///
622/// Shows a table with database names, sizes, and estimated replication times.
623/// Prompts the user to proceed with the replication.
624///
625/// # Arguments
626///
627/// * `sizes` - Vector of database size estimates
628///
629/// # Returns
630///
631/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
632///
633/// # Errors
634///
635/// Returns an error if stdin/stdout operations fail.
636fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637    use std::time::Duration;
638
639    // Calculate totals
640    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643    // Print table header
644    println!();
645    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646    println!("{}", "─".repeat(50));
647
648    // Print each database
649    for size in sizes {
650        println!(
651            "{:<20} {:<12} {:<15}",
652            size.name,
653            size.size_human,
654            migration::format_duration(size.estimated_duration)
655        );
656    }
657
658    // Print totals
659    println!("{}", "─".repeat(50));
660    println!(
661        "Total: {} (estimated {})",
662        migration::format_bytes(total_bytes),
663        migration::format_duration(total_duration)
664    );
665    println!();
666
667    // Prompt for confirmation
668    print!("Proceed with replication? [y/N]: ");
669    io::stdout().flush()?;
670
671    let mut input = String::new();
672    io::stdin()
673        .read_line(&mut input)
674        .context("Failed to read user input")?;
675
676    Ok(input.trim().to_lowercase() == "y")
677}
678
679/// Checks if a database is empty (no user tables)
680async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
681    // Need to connect to the specific database to check tables
682    let db_url = replace_database_in_url(target_url, db_name)?;
683    let client = postgres::connect_with_retry(&db_url).await?;
684
685    let query = "
686        SELECT COUNT(*)
687        FROM information_schema.tables
688        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
689    ";
690
691    let row = client.query_one(query, &[]).await?;
692    let count: i64 = row.get(0);
693
694    Ok(count == 0)
695}
696
697/// Prompts user to drop existing database
698fn prompt_drop_database(db_name: &str) -> Result<bool> {
699    use std::io::{self, Write};
700
701    print!(
702        "\nWarning: Database '{}' already exists on target and contains data.\n\
703         Drop and recreate database? This will delete all existing data. [y/N]: ",
704        db_name
705    );
706    io::stdout().flush()?;
707
708    let mut input = String::new();
709    io::stdin().read_line(&mut input)?;
710
711    Ok(input.trim().eq_ignore_ascii_case("y"))
712}
713
714/// Drops a database if it exists
715async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
716    // Validate database name to prevent SQL injection
717    crate::utils::validate_postgres_identifier(db_name)
718        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
719
720    tracing::info!("  Dropping existing database '{}'...", db_name);
721
722    // Terminate existing connections to the database
723    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
724    let terminate_query = "
725        SELECT pg_terminate_backend(sa.pid)
726        FROM pg_stat_activity sa
727        JOIN pg_roles r ON sa.usename = r.rolname
728        WHERE sa.datname = $1
729          AND sa.pid <> pg_backend_pid()
730          AND NOT r.rolsuper
731    ";
732    target_conn.execute(terminate_query, &[&db_name]).await?;
733
734    // Check if any connections remain (including SUPERUSER connections we couldn't terminate)
735    let remaining_query = "
736        SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
737        FROM pg_stat_activity sa
738        WHERE sa.datname = $1
739          AND sa.pid <> pg_backend_pid()
740    ";
741    let row = target_conn
742        .query_one(remaining_query, &[&db_name])
743        .await
744        .context("Failed to check remaining connections")?;
745    let remaining_count: i64 = row.get(0);
746    let remaining_users: Option<String> = row.get(1);
747
748    if remaining_count > 0 {
749        let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
750        bail!(
751            "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
752             These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
753             This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
754             processes maintain superuser connections.\n\n\
755             To resolve this:\n\
756             1. Wait a few minutes and retry (system connections may be temporary)\n\
757             2. Ask your database administrator to terminate the blocking sessions:\n\
758                SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
759             3. If using AWS RDS, check for RDS-managed connections in the RDS console",
760            db_name,
761            remaining_count,
762            users,
763            db_name
764        );
765    }
766
767    // Drop the database
768    let drop_query = format!(
769        "DROP DATABASE IF EXISTS {}",
770        crate::utils::quote_ident(db_name)
771    );
772    target_conn
773        .execute(&drop_query, &[])
774        .await
775        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
776
777    tracing::info!("  ✓ Database '{}' dropped", db_name);
778    Ok(())
779}
780
781/// Initial replication from SQLite to PostgreSQL
782///
783/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
784/// 1. Validates SQLite file exists and is readable
785/// 2. Validates PostgreSQL target connection
786/// 3. Lists all tables from SQLite database
787/// 4. For each table:
788///    - Converts rows to JSONB format
789///    - Creates JSONB table in PostgreSQL
790///    - Batch inserts all data
791///
792/// All SQLite data is stored as JSONB with metadata:
793/// - id: Original row ID (from ID column or row number)
794/// - data: Complete row as JSON object
795/// - _source_type: "sqlite"
796/// - _migrated_at: Timestamp of migration
797///
798/// # Arguments
799///
800/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
801/// * `target_url` - PostgreSQL connection string for target (Seren) database
802///
803/// # Returns
804///
805/// Returns `Ok(())` if migration completes successfully.
806///
807/// # Errors
808///
809/// This function will return an error if:
810/// - SQLite file doesn't exist or isn't readable
811/// - Cannot connect to target PostgreSQL database
812/// - Table conversion fails
813/// - Database creation or insert operations fail
814///
815/// # Examples
816///
817/// ```no_run
818/// # use anyhow::Result;
819/// # use database_replicator::commands::init::init_sqlite_to_postgres;
820/// # async fn example() -> Result<()> {
821/// init_sqlite_to_postgres(
822///     "database.db",
823///     "postgresql://user:pass@seren.example.com/targetdb"
824/// ).await?;
825/// # Ok(())
826/// # }
827/// ```
828pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
829    tracing::info!("Starting SQLite to PostgreSQL migration...");
830
831    // Step 1: Validate SQLite file
832    tracing::info!("Step 1/4: Validating SQLite database...");
833    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
834        .context("SQLite file validation failed")?;
835    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
836
837    // Step 2: Open SQLite connection (read-only)
838    tracing::info!("Step 2/4: Opening SQLite database...");
839    let sqlite_conn =
840        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
841    tracing::info!("  ✓ SQLite database opened (read-only mode)");
842
843    // Step 3: List all tables
844    tracing::info!("Step 3/4: Discovering tables...");
845    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
846        .context("Failed to list tables from SQLite database")?;
847
848    if tables.is_empty() {
849        tracing::warn!("⚠ No tables found in SQLite database");
850        tracing::info!("✅ Migration complete (no tables to migrate)");
851        return Ok(());
852    }
853
854    tracing::info!("Found {} table(s) to migrate", tables.len());
855
856    // Connect to PostgreSQL target
857    let target_client = postgres::connect_with_retry(target_url).await?;
858    tracing::info!("  ✓ Connected to PostgreSQL target");
859
860    // Step 4: Migrate each table
861    tracing::info!("Step 4/4: Migrating tables...");
862    for (idx, table_name) in tables.iter().enumerate() {
863        tracing::info!(
864            "Migrating table {}/{}: '{}'",
865            idx + 1,
866            tables.len(),
867            table_name
868        );
869
870        // Convert SQLite table to JSONB
871        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
872            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
873
874        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
875
876        // Create JSONB table in PostgreSQL
877        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
878            .await
879            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
880
881        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
882
883        if !rows.is_empty() {
884            // Batch insert all rows
885            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
886                .await
887                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
888
889            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
890        } else {
891            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
892        }
893    }
894
895    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
896    tracing::info!(
897        "   Migrated {} table(s) from '{}' to PostgreSQL",
898        tables.len(),
899        sqlite_path
900    );
901
902    Ok(())
903}
904
905/// Initial replication from MongoDB to PostgreSQL
906///
907/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
908/// 1. Validates MongoDB connection string
909/// 2. Connects to MongoDB and verifies connection
910/// 3. Extracts database name from connection string
911/// 4. Lists all collections from MongoDB database
912/// 5. For each collection:
913///    - Converts documents to JSONB format
914///    - Creates JSONB table in PostgreSQL
915///    - Batch inserts all data
916///
917/// All MongoDB data is stored as JSONB with metadata:
918/// - id: Original _id field (ObjectId → hex, String/Int → string)
919/// - data: Complete document as JSON object
920/// - _source_type: "mongodb"
921/// - _migrated_at: Timestamp of migration
922///
923/// # Arguments
924///
925/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
926/// * `target_url` - PostgreSQL connection string for target (Seren) database
927///
928/// # Returns
929///
930/// Returns `Ok(())` if migration completes successfully.
931///
932/// # Errors
933///
934/// This function will return an error if:
935/// - MongoDB connection string is invalid
936/// - Cannot connect to MongoDB database
937/// - Database name is not specified in connection string
938/// - Cannot connect to target PostgreSQL database
939/// - Collection conversion fails
940/// - Database creation or insert operations fail
941///
942/// # Examples
943///
944/// ```no_run
945/// # use anyhow::Result;
946/// # use database_replicator::commands::init::init_mongodb_to_postgres;
947/// # async fn example() -> Result<()> {
948/// init_mongodb_to_postgres(
949///     "mongodb://localhost:27017/mydb",
950///     "postgresql://user:pass@seren.example.com/targetdb"
951/// ).await?;
952/// # Ok(())
953/// # }
954/// ```
955pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
956    tracing::info!("Starting MongoDB to PostgreSQL migration...");
957
958    // Step 1: Validate and connect to MongoDB
959    tracing::info!("Step 1/5: Validating MongoDB connection...");
960    let client = crate::mongodb::connect_mongodb(mongo_url)
961        .await
962        .context("MongoDB connection failed")?;
963    tracing::info!("  ✓ MongoDB connection validated");
964
965    // Step 2: Extract database name
966    tracing::info!("Step 2/5: Extracting database name...");
967    let db_name = crate::mongodb::extract_database_name(mongo_url)
968        .await
969        .context("Failed to parse MongoDB connection string")?
970        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
971    tracing::info!("  ✓ Database name: '{}'", db_name);
972
973    // Step 3: List all collections
974    tracing::info!("Step 3/5: Discovering collections...");
975    let db = client.database(&db_name);
976    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
977        .await
978        .context("Failed to list collections from MongoDB database")?;
979
980    if collections.is_empty() {
981        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
982        tracing::info!("✅ Migration complete (no collections to migrate)");
983        return Ok(());
984    }
985
986    tracing::info!("Found {} collection(s) to migrate", collections.len());
987
988    // Step 4: Connect to PostgreSQL target
989    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
990    let target_client = postgres::connect_with_retry(target_url).await?;
991    tracing::info!("  ✓ Connected to PostgreSQL target");
992
993    // Step 5: Migrate each collection
994    tracing::info!("Step 5/5: Migrating collections...");
995    for (idx, collection_name) in collections.iter().enumerate() {
996        tracing::info!(
997            "Migrating collection {}/{}: '{}'",
998            idx + 1,
999            collections.len(),
1000            collection_name
1001        );
1002
1003        // Convert MongoDB collection to JSONB
1004        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1005            .await
1006            .with_context(|| {
1007                format!(
1008                    "Failed to convert collection '{}' to JSONB",
1009                    collection_name
1010                )
1011            })?;
1012
1013        tracing::info!(
1014            "  ✓ Converted {} documents from '{}'",
1015            rows.len(),
1016            collection_name
1017        );
1018
1019        // Create JSONB table in PostgreSQL
1020        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1021            .await
1022            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1023
1024        tracing::info!(
1025            "  ✓ Created JSONB table '{}' in PostgreSQL",
1026            collection_name
1027        );
1028
1029        if !rows.is_empty() {
1030            // Batch insert all rows
1031            crate::jsonb::writer::insert_jsonb_batch(
1032                &target_client,
1033                collection_name,
1034                rows,
1035                "mongodb",
1036            )
1037            .await
1038            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1039
1040            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1041        } else {
1042            tracing::info!(
1043                "  ✓ Collection '{}' is empty (no documents to insert)",
1044                collection_name
1045            );
1046        }
1047    }
1048
1049    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1050    tracing::info!(
1051        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1052        collections.len(),
1053        db_name
1054    );
1055
1056    Ok(())
1057}
1058
1059/// Initial replication from MySQL to PostgreSQL
1060///
1061/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1062/// 1. Validates MySQL connection string
1063/// 2. Connects to MySQL and verifies connection
1064/// 3. Extracts database name from connection string
1065/// 4. Lists all tables from MySQL database
1066/// 5. For each table:
1067///    - Converts rows to JSONB format
1068///    - Creates JSONB table in PostgreSQL
1069///    - Batch inserts all data
1070///
1071/// All MySQL data is stored as JSONB with metadata:
1072/// - id: Primary key or auto-generated ID
1073/// - data: Complete row as JSON object
1074/// - _source_type: "mysql"
1075/// - _migrated_at: Timestamp of replication
1076///
1077/// # Arguments
1078///
1079/// * `mysql_url` - MySQL connection string (mysql://...)
1080/// * `target_url` - PostgreSQL connection string for target (Seren) database
1081///
1082/// # Returns
1083///
1084/// Returns `Ok(())` if replication completes successfully.
1085///
1086/// # Errors
1087///
1088/// This function will return an error if:
1089/// - MySQL connection string is invalid
1090/// - Cannot connect to MySQL database
1091/// - Database name is not specified in connection string
1092/// - Cannot connect to target PostgreSQL database
1093/// - Table conversion fails
1094/// - Database creation or insert operations fail
1095///
1096/// # Examples
1097///
1098/// ```no_run
1099/// # use anyhow::Result;
1100/// # use database_replicator::commands::init::init_mysql_to_postgres;
1101/// # async fn example() -> Result<()> {
1102/// init_mysql_to_postgres(
1103///     "mysql://user:pass@localhost:3306/mydb",
1104///     "postgresql://user:pass@seren.example.com/targetdb"
1105/// ).await?;
1106/// # Ok(())
1107/// # }
1108/// ```
1109pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1110    tracing::info!("Starting MySQL to PostgreSQL replication...");
1111
1112    // Step 1: Validate and connect to MySQL
1113    tracing::info!("Step 1/5: Validating MySQL connection...");
1114    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1115        .await
1116        .context("MySQL connection failed")?;
1117    tracing::info!("  ✓ MySQL connection validated");
1118
1119    // Step 2: Extract database name
1120    tracing::info!("Step 2/5: Extracting database name...");
1121    let db_name = crate::mysql::extract_database_name(mysql_url)
1122        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1123    tracing::info!("  ✓ Database name: '{}'", db_name);
1124
1125    // Step 3: List all tables
1126    tracing::info!("Step 3/5: Discovering tables...");
1127    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1128        .await
1129        .context("Failed to list tables from MySQL database")?;
1130
1131    if tables.is_empty() {
1132        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1133        tracing::info!("✅ Replication complete (no tables to replicate)");
1134        return Ok(());
1135    }
1136
1137    tracing::info!("Found {} table(s) to replicate", tables.len());
1138
1139    // Step 4: Connect to PostgreSQL target
1140    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1141    let target_client = postgres::connect_with_retry(target_url).await?;
1142    tracing::info!("  ✓ Connected to PostgreSQL target");
1143
1144    // Step 5: Replicate each table
1145    tracing::info!("Step 5/5: Replicating tables...");
1146    for (idx, table_name) in tables.iter().enumerate() {
1147        tracing::info!(
1148            "Replicating table {}/{}: '{}'",
1149            idx + 1,
1150            tables.len(),
1151            table_name
1152        );
1153
1154        // Convert MySQL table to JSONB
1155        let rows =
1156            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1157                .await
1158                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1159
1160        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1161
1162        // Create JSONB table in PostgreSQL
1163        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1164            .await
1165            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1166
1167        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1168
1169        if !rows.is_empty() {
1170            // Batch insert all rows
1171            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1172                .await
1173                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1174
1175            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1176        } else {
1177            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1178        }
1179    }
1180
1181    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1182    tracing::info!(
1183        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1184        tables.len(),
1185        db_name
1186    );
1187
1188    Ok(())
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193    use super::*;
1194
1195    #[tokio::test]
1196    #[ignore]
1197    async fn test_init_replicates_database() {
1198        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1199        let target = std::env::var("TEST_TARGET_URL").unwrap();
1200
1201        // Skip confirmation for automated tests, disable sync to keep test simple
1202        let filter = crate::filters::ReplicationFilter::empty();
1203        let result = init(&source, &target, true, filter, false, false, true, false).await;
1204        assert!(result.is_ok());
1205    }
1206
1207    #[test]
1208    fn test_replace_database_in_url() {
1209        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1210        let result = replace_database_in_url(url, "newdb").unwrap();
1211        assert_eq!(
1212            result,
1213            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1214        );
1215
1216        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1217        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1218        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1219    }
1220
1221    #[tokio::test]
1222    #[ignore]
1223    async fn test_database_is_empty() {
1224        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1225
1226        // postgres database might be empty of user tables
1227        // This test just verifies the function doesn't crash
1228        let result = database_is_empty(&url, "postgres").await;
1229        assert!(result.is_ok());
1230    }
1231}