database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Estimate database sizes and get confirmation
344    if !skip_confirmation {
345        tracing::info!("Analyzing database sizes...");
346        let size_estimates = {
347            // Scope the connection so it's dropped after size estimation
348            let source_client = postgres::connect_with_retry(source_url).await?;
349            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350                .await?
351        }; // Connection dropped here
352
353        if !confirm_replication(&size_estimates)? {
354            bail!("Replication cancelled by user");
355        }
356    }
357
358    // Step 4: Replicate each database
359    tracing::info!("Step 4/4: Replicating databases...");
360    for (idx, db_info) in databases.iter().enumerate() {
361        let filtered_tables = filter.predicate_tables(&db_info.name);
362        if checkpoint_state.is_completed(&db_info.name) {
363            tracing::info!(
364                "Skipping database '{}' (already completed per checkpoint)",
365                db_info.name
366            );
367            continue;
368        }
369        tracing::info!(
370            "Replicating database {}/{}: '{}'",
371            idx + 1,
372            databases.len(),
373            db_info.name
374        );
375
376        // Build connection URLs for this specific database
377        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380        // Handle database creation atomically to avoid TOCTOU race condition
381        // Scope the connection so it's dropped before dump/restore subprocess operations
382        {
383            let target_client = postgres::connect_with_retry(target_url).await?;
384
385            // Validate database name to prevent SQL injection
386            crate::utils::validate_postgres_identifier(&db_info.name)
387                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389            // Try to create database atomically (avoids TOCTOU vulnerability)
390            let create_query = format!(
391                "CREATE DATABASE {}",
392                crate::utils::quote_ident(&db_info.name)
393            );
394            match target_client.execute(&create_query, &[]).await {
395                Ok(_) => {
396                    tracing::info!("  Created database '{}'", db_info.name);
397                }
398                Err(err) => {
399                    // Check if error is "database already exists" (error code 42P04)
400                    if let Some(db_error) = err.as_db_error() {
401                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402                            // Database already exists - handle based on user preferences
403                            tracing::info!(
404                                "  Database '{}' already exists on target",
405                                db_info.name
406                            );
407
408                            // Check if empty (reuse existing connection to avoid pool exhaustion)
409                            if database_is_empty(&target_client).await? {
410                                tracing::info!(
411                                    "  Database '{}' is empty, proceeding with restore",
412                                    db_info.name
413                                );
414                            } else {
415                                // Database exists and has data
416                                let should_drop = if drop_existing {
417                                    // Force drop with --drop-existing flag
418                                    true
419                                } else if skip_confirmation {
420                                    // Auto-confirm drop with --yes flag (non-interactive)
421                                    tracing::info!(
422                                        "  Auto-confirming drop for database '{}' (--yes flag)",
423                                        db_info.name
424                                    );
425                                    true
426                                } else {
427                                    // Interactive mode: prompt user
428                                    prompt_drop_database(&db_info.name)?
429                                };
430
431                                if should_drop {
432                                    drop_database_if_exists(&target_client, &db_info.name).await?;
433
434                                    // Recreate the database
435                                    let create_query = format!(
436                                        "CREATE DATABASE {}",
437                                        crate::utils::quote_ident(&db_info.name)
438                                    );
439                                    target_client
440                                        .execute(&create_query, &[])
441                                        .await
442                                        .with_context(|| {
443                                            format!(
444                                                "Failed to create database '{}' after drop",
445                                                db_info.name
446                                            )
447                                        })?;
448                                    tracing::info!("  Created database '{}'", db_info.name);
449                                } else {
450                                    bail!("Aborted: Database '{}' already exists", db_info.name);
451                                }
452                            }
453                        } else {
454                            // Some other database error - propagate it
455                            return Err(err).with_context(|| {
456                                format!("Failed to create database '{}'", db_info.name)
457                            });
458                        }
459                    } else {
460                        // Not a database error - propagate it
461                        return Err(err).with_context(|| {
462                            format!("Failed to create database '{}'", db_info.name)
463                        });
464                    }
465                }
466            }
467        } // Connection dropped here before dump/restore operations
468
469        // Dump and restore schema
470        tracing::info!("  Dumping schema for '{}'...", db_info.name);
471        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472        migration::dump_schema(
473            &source_db_url,
474            &db_info.name,
475            schema_file.to_str().unwrap(),
476            &filter,
477        )
478        .await?;
479
480        tracing::info!("  Restoring schema for '{}'...", db_info.name);
481        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483        // Dump and restore data (using directory format for parallel operations)
484        tracing::info!("  Dumping data for '{}'...", db_info.name);
485        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486        migration::dump_data(
487            &source_db_url,
488            &db_info.name,
489            data_dir.to_str().unwrap(),
490            &filter,
491        )
492        .await?;
493
494        tracing::info!("  Restoring data for '{}'...", db_info.name);
495        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497        if !filtered_tables.is_empty() {
498            tracing::info!(
499                "  Applying filtered replication for {} table(s)...",
500                filtered_tables.len()
501            );
502            migration::filtered::copy_filtered_tables(
503                &source_db_url,
504                &target_db_url,
505                &filtered_tables,
506            )
507            .await?;
508        }
509
510        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512        checkpoint_state.mark_completed(&db_info.name);
513        checkpoint_state
514            .save(&checkpoint_path)
515            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516    }
517
518    // Explicitly clean up temp directory
519    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
520    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521        tracing::warn!("Failed to clean up temp directory: {}", e);
522        // Don't fail the entire operation if cleanup fails
523    }
524
525    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526        tracing::warn!("Failed to remove checkpoint state: {}", err);
527    }
528
529    tracing::info!("✅ Initial replication complete");
530
531    // Check wal_level before attempting to set up sync
532    let mut should_enable_sync = enable_sync;
533    if enable_sync {
534        tracing::info!("Checking target wal_level for logical replication...");
535        let target_wal_level = {
536            // Scope the connection for quick wal_level check
537            let target_client = postgres::connect_with_retry(target_url).await?;
538            postgres::check_wal_level(&target_client).await?
539        }; // Connection dropped here
540
541        if target_wal_level != "logical" {
542            tracing::warn!("");
543            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
545            tracing::warn!("");
546            tracing::warn!("  To fix this:");
547            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
548            tracing::warn!("    2. Restart PostgreSQL server");
549            tracing::warn!(
550                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551            );
552            tracing::warn!("");
553            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554            should_enable_sync = false;
555        }
556    }
557
558    // Set up continuous logical replication if enabled
559    if should_enable_sync {
560        tracing::info!("");
561        tracing::info!("========================================");
562        tracing::info!("Step 5/5: Setting up continuous replication...");
563        tracing::info!("========================================");
564        tracing::info!("");
565
566        // Call sync command with the same filter
567        crate::commands::sync(
568            source_url,
569            target_url,
570            Some(filter),
571            None,
572            None,
573            None,
574            false,
575        )
576        .await
577        .context("Failed to set up continuous replication")?;
578
579        tracing::info!("");
580        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581    } else {
582        tracing::info!("");
583        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584        tracing::info!("  To enable it later, run:");
585        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
586    }
587
588    Ok(())
589}
590
591/// Replace the database name in a connection URL
592fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593    // Parse URL to find database name
594    // Format: postgresql://user:pass@host:port/database?params
595
596    // Split by '?' to separate params
597    let parts: Vec<&str> = url.split('?').collect();
598    let base_url = parts[0];
599    let params = if parts.len() > 1 {
600        Some(parts[1])
601    } else {
602        None
603    };
604
605    // Split base by '/' to get everything before database name
606    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607    if url_parts.len() != 2 {
608        anyhow::bail!("Invalid connection URL format");
609    }
610
611    // Reconstruct URL with new database name
612    let mut new_url = format!("{}/{}", url_parts[1], new_database);
613    if let Some(p) = params {
614        new_url = format!("{}?{}", new_url, p);
615    }
616
617    Ok(new_url)
618}
619
620/// Display database size estimates and prompt for confirmation
621///
622/// Shows a table with database names, sizes, and estimated replication times.
623/// Prompts the user to proceed with the replication.
624///
625/// # Arguments
626///
627/// * `sizes` - Vector of database size estimates
628///
629/// # Returns
630///
631/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
632///
633/// # Errors
634///
635/// Returns an error if stdin/stdout operations fail.
636fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637    use std::time::Duration;
638
639    // Calculate totals
640    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643    // Print table header
644    println!();
645    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646    println!("{}", "─".repeat(50));
647
648    // Print each database
649    for size in sizes {
650        println!(
651            "{:<20} {:<12} {:<15}",
652            size.name,
653            size.size_human,
654            migration::format_duration(size.estimated_duration)
655        );
656    }
657
658    // Print totals
659    println!("{}", "─".repeat(50));
660    println!(
661        "Total: {} (estimated {})",
662        migration::format_bytes(total_bytes),
663        migration::format_duration(total_duration)
664    );
665    println!();
666
667    // Prompt for confirmation
668    print!("Proceed with replication? [y/N]: ");
669    io::stdout().flush()?;
670
671    let mut input = String::new();
672    io::stdin()
673        .read_line(&mut input)
674        .context("Failed to read user input")?;
675
676    Ok(input.trim().to_lowercase() == "y")
677}
678
679/// Checks if a database is empty (no user tables)
680///
681/// Uses the existing connection to avoid connection pool exhaustion on
682/// serverless PostgreSQL providers (SerenDB, Neon) that have strict limits.
683async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
684    let query = "
685        SELECT COUNT(*)
686        FROM information_schema.tables
687        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
688    ";
689
690    let row = client.query_one(query, &[]).await?;
691    let count: i64 = row.get(0);
692
693    Ok(count == 0)
694}
695
696/// Prompts user to drop existing database
697fn prompt_drop_database(db_name: &str) -> Result<bool> {
698    use std::io::{self, Write};
699
700    print!(
701        "\nWarning: Database '{}' already exists on target and contains data.\n\
702         Drop and recreate database? This will delete all existing data. [y/N]: ",
703        db_name
704    );
705    io::stdout().flush()?;
706
707    let mut input = String::new();
708    io::stdin().read_line(&mut input)?;
709
710    Ok(input.trim().eq_ignore_ascii_case("y"))
711}
712
713/// Drops a database if it exists
714async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
715    // Validate database name to prevent SQL injection
716    crate::utils::validate_postgres_identifier(db_name)
717        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
718
719    tracing::info!("  Dropping existing database '{}'...", db_name);
720
721    // Terminate existing connections to the database
722    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
723    let terminate_query = "
724        SELECT pg_terminate_backend(sa.pid)
725        FROM pg_stat_activity sa
726        JOIN pg_roles r ON sa.usename = r.rolname
727        WHERE sa.datname = $1
728          AND sa.pid <> pg_backend_pid()
729          AND NOT r.rolsuper
730    ";
731    target_conn.execute(terminate_query, &[&db_name]).await?;
732
733    // Check if any connections remain (including SUPERUSER connections we couldn't terminate)
734    let remaining_query = "
735        SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
736        FROM pg_stat_activity sa
737        WHERE sa.datname = $1
738          AND sa.pid <> pg_backend_pid()
739    ";
740    let row = target_conn
741        .query_one(remaining_query, &[&db_name])
742        .await
743        .context("Failed to check remaining connections")?;
744    let remaining_count: i64 = row.get(0);
745    let remaining_users: Option<String> = row.get(1);
746
747    if remaining_count > 0 {
748        let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
749        bail!(
750            "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
751             These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
752             This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
753             processes maintain superuser connections.\n\n\
754             To resolve this:\n\
755             1. Wait a few minutes and retry (system connections may be temporary)\n\
756             2. Ask your database administrator to terminate the blocking sessions:\n\
757                SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
758             3. If using AWS RDS, check for RDS-managed connections in the RDS console",
759            db_name,
760            remaining_count,
761            users,
762            db_name
763        );
764    }
765
766    // Drop the database
767    let drop_query = format!(
768        "DROP DATABASE IF EXISTS {}",
769        crate::utils::quote_ident(db_name)
770    );
771    target_conn
772        .execute(&drop_query, &[])
773        .await
774        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
775
776    tracing::info!("  ✓ Database '{}' dropped", db_name);
777    Ok(())
778}
779
780/// Initial replication from SQLite to PostgreSQL
781///
782/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
783/// 1. Validates SQLite file exists and is readable
784/// 2. Validates PostgreSQL target connection
785/// 3. Lists all tables from SQLite database
786/// 4. For each table:
787///    - Converts rows to JSONB format
788///    - Creates JSONB table in PostgreSQL
789///    - Batch inserts all data
790///
791/// All SQLite data is stored as JSONB with metadata:
792/// - id: Original row ID (from ID column or row number)
793/// - data: Complete row as JSON object
794/// - _source_type: "sqlite"
795/// - _migrated_at: Timestamp of migration
796///
797/// # Arguments
798///
799/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
800/// * `target_url` - PostgreSQL connection string for target (Seren) database
801///
802/// # Returns
803///
804/// Returns `Ok(())` if migration completes successfully.
805///
806/// # Errors
807///
808/// This function will return an error if:
809/// - SQLite file doesn't exist or isn't readable
810/// - Cannot connect to target PostgreSQL database
811/// - Table conversion fails
812/// - Database creation or insert operations fail
813///
814/// # Examples
815///
816/// ```no_run
817/// # use anyhow::Result;
818/// # use database_replicator::commands::init::init_sqlite_to_postgres;
819/// # async fn example() -> Result<()> {
820/// init_sqlite_to_postgres(
821///     "database.db",
822///     "postgresql://user:pass@seren.example.com/targetdb"
823/// ).await?;
824/// # Ok(())
825/// # }
826/// ```
827pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
828    tracing::info!("Starting SQLite to PostgreSQL migration...");
829
830    // Step 1: Validate SQLite file
831    tracing::info!("Step 1/4: Validating SQLite database...");
832    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
833        .context("SQLite file validation failed")?;
834    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
835
836    // Step 2: Open SQLite connection (read-only)
837    tracing::info!("Step 2/4: Opening SQLite database...");
838    let sqlite_conn =
839        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
840    tracing::info!("  ✓ SQLite database opened (read-only mode)");
841
842    // Step 3: List all tables
843    tracing::info!("Step 3/4: Discovering tables...");
844    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
845        .context("Failed to list tables from SQLite database")?;
846
847    if tables.is_empty() {
848        tracing::warn!("⚠ No tables found in SQLite database");
849        tracing::info!("✅ Migration complete (no tables to migrate)");
850        return Ok(());
851    }
852
853    tracing::info!("Found {} table(s) to migrate", tables.len());
854
855    // Connect to PostgreSQL target
856    let target_client = postgres::connect_with_retry(target_url).await?;
857    tracing::info!("  ✓ Connected to PostgreSQL target");
858
859    // Step 4: Migrate each table
860    tracing::info!("Step 4/4: Migrating tables...");
861    for (idx, table_name) in tables.iter().enumerate() {
862        tracing::info!(
863            "Migrating table {}/{}: '{}'",
864            idx + 1,
865            tables.len(),
866            table_name
867        );
868
869        // Convert SQLite table to JSONB
870        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
871            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
872
873        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
874
875        // Create JSONB table in PostgreSQL
876        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
877            .await
878            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
879
880        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
881
882        if !rows.is_empty() {
883            // Batch insert all rows
884            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
885                .await
886                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
887
888            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
889        } else {
890            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
891        }
892    }
893
894    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
895    tracing::info!(
896        "   Migrated {} table(s) from '{}' to PostgreSQL",
897        tables.len(),
898        sqlite_path
899    );
900
901    Ok(())
902}
903
904/// Initial replication from MongoDB to PostgreSQL
905///
906/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
907/// 1. Validates MongoDB connection string
908/// 2. Connects to MongoDB and verifies connection
909/// 3. Extracts database name from connection string
910/// 4. Lists all collections from MongoDB database
911/// 5. For each collection:
912///    - Converts documents to JSONB format
913///    - Creates JSONB table in PostgreSQL
914///    - Batch inserts all data
915///
916/// All MongoDB data is stored as JSONB with metadata:
917/// - id: Original _id field (ObjectId → hex, String/Int → string)
918/// - data: Complete document as JSON object
919/// - _source_type: "mongodb"
920/// - _migrated_at: Timestamp of migration
921///
922/// # Arguments
923///
924/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
925/// * `target_url` - PostgreSQL connection string for target (Seren) database
926///
927/// # Returns
928///
929/// Returns `Ok(())` if migration completes successfully.
930///
931/// # Errors
932///
933/// This function will return an error if:
934/// - MongoDB connection string is invalid
935/// - Cannot connect to MongoDB database
936/// - Database name is not specified in connection string
937/// - Cannot connect to target PostgreSQL database
938/// - Collection conversion fails
939/// - Database creation or insert operations fail
940///
941/// # Examples
942///
943/// ```no_run
944/// # use anyhow::Result;
945/// # use database_replicator::commands::init::init_mongodb_to_postgres;
946/// # async fn example() -> Result<()> {
947/// init_mongodb_to_postgres(
948///     "mongodb://localhost:27017/mydb",
949///     "postgresql://user:pass@seren.example.com/targetdb"
950/// ).await?;
951/// # Ok(())
952/// # }
953/// ```
954pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
955    tracing::info!("Starting MongoDB to PostgreSQL migration...");
956
957    // Step 1: Validate and connect to MongoDB
958    tracing::info!("Step 1/5: Validating MongoDB connection...");
959    let client = crate::mongodb::connect_mongodb(mongo_url)
960        .await
961        .context("MongoDB connection failed")?;
962    tracing::info!("  ✓ MongoDB connection validated");
963
964    // Step 2: Extract database name
965    tracing::info!("Step 2/5: Extracting database name...");
966    let db_name = crate::mongodb::extract_database_name(mongo_url)
967        .await
968        .context("Failed to parse MongoDB connection string")?
969        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
970    tracing::info!("  ✓ Database name: '{}'", db_name);
971
972    // Step 3: List all collections
973    tracing::info!("Step 3/5: Discovering collections...");
974    let db = client.database(&db_name);
975    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
976        .await
977        .context("Failed to list collections from MongoDB database")?;
978
979    if collections.is_empty() {
980        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
981        tracing::info!("✅ Migration complete (no collections to migrate)");
982        return Ok(());
983    }
984
985    tracing::info!("Found {} collection(s) to migrate", collections.len());
986
987    // Step 4: Connect to PostgreSQL target
988    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
989    let target_client = postgres::connect_with_retry(target_url).await?;
990    tracing::info!("  ✓ Connected to PostgreSQL target");
991
992    // Step 5: Migrate each collection
993    tracing::info!("Step 5/5: Migrating collections...");
994    for (idx, collection_name) in collections.iter().enumerate() {
995        tracing::info!(
996            "Migrating collection {}/{}: '{}'",
997            idx + 1,
998            collections.len(),
999            collection_name
1000        );
1001
1002        // Convert MongoDB collection to JSONB
1003        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1004            .await
1005            .with_context(|| {
1006                format!(
1007                    "Failed to convert collection '{}' to JSONB",
1008                    collection_name
1009                )
1010            })?;
1011
1012        tracing::info!(
1013            "  ✓ Converted {} documents from '{}'",
1014            rows.len(),
1015            collection_name
1016        );
1017
1018        // Create JSONB table in PostgreSQL
1019        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1020            .await
1021            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1022
1023        tracing::info!(
1024            "  ✓ Created JSONB table '{}' in PostgreSQL",
1025            collection_name
1026        );
1027
1028        if !rows.is_empty() {
1029            // Batch insert all rows
1030            crate::jsonb::writer::insert_jsonb_batch(
1031                &target_client,
1032                collection_name,
1033                rows,
1034                "mongodb",
1035            )
1036            .await
1037            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1038
1039            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1040        } else {
1041            tracing::info!(
1042                "  ✓ Collection '{}' is empty (no documents to insert)",
1043                collection_name
1044            );
1045        }
1046    }
1047
1048    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1049    tracing::info!(
1050        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1051        collections.len(),
1052        db_name
1053    );
1054
1055    Ok(())
1056}
1057
1058/// Initial replication from MySQL to PostgreSQL
1059///
1060/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1061/// 1. Validates MySQL connection string
1062/// 2. Connects to MySQL and verifies connection
1063/// 3. Extracts database name from connection string
1064/// 4. Lists all tables from MySQL database
1065/// 5. For each table:
1066///    - Converts rows to JSONB format
1067///    - Creates JSONB table in PostgreSQL
1068///    - Batch inserts all data
1069///
1070/// All MySQL data is stored as JSONB with metadata:
1071/// - id: Primary key or auto-generated ID
1072/// - data: Complete row as JSON object
1073/// - _source_type: "mysql"
1074/// - _migrated_at: Timestamp of replication
1075///
1076/// # Arguments
1077///
1078/// * `mysql_url` - MySQL connection string (mysql://...)
1079/// * `target_url` - PostgreSQL connection string for target (Seren) database
1080///
1081/// # Returns
1082///
1083/// Returns `Ok(())` if replication completes successfully.
1084///
1085/// # Errors
1086///
1087/// This function will return an error if:
1088/// - MySQL connection string is invalid
1089/// - Cannot connect to MySQL database
1090/// - Database name is not specified in connection string
1091/// - Cannot connect to target PostgreSQL database
1092/// - Table conversion fails
1093/// - Database creation or insert operations fail
1094///
1095/// # Examples
1096///
1097/// ```no_run
1098/// # use anyhow::Result;
1099/// # use database_replicator::commands::init::init_mysql_to_postgres;
1100/// # async fn example() -> Result<()> {
1101/// init_mysql_to_postgres(
1102///     "mysql://user:pass@localhost:3306/mydb",
1103///     "postgresql://user:pass@seren.example.com/targetdb"
1104/// ).await?;
1105/// # Ok(())
1106/// # }
1107/// ```
1108pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1109    tracing::info!("Starting MySQL to PostgreSQL replication...");
1110
1111    // Step 1: Validate and connect to MySQL
1112    tracing::info!("Step 1/5: Validating MySQL connection...");
1113    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1114        .await
1115        .context("MySQL connection failed")?;
1116    tracing::info!("  ✓ MySQL connection validated");
1117
1118    // Step 2: Extract database name
1119    tracing::info!("Step 2/5: Extracting database name...");
1120    let db_name = crate::mysql::extract_database_name(mysql_url)
1121        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1122    tracing::info!("  ✓ Database name: '{}'", db_name);
1123
1124    // Step 3: List all tables
1125    tracing::info!("Step 3/5: Discovering tables...");
1126    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1127        .await
1128        .context("Failed to list tables from MySQL database")?;
1129
1130    if tables.is_empty() {
1131        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1132        tracing::info!("✅ Replication complete (no tables to replicate)");
1133        return Ok(());
1134    }
1135
1136    tracing::info!("Found {} table(s) to replicate", tables.len());
1137
1138    // Step 4: Connect to PostgreSQL target
1139    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1140    let target_client = postgres::connect_with_retry(target_url).await?;
1141    tracing::info!("  ✓ Connected to PostgreSQL target");
1142
1143    // Step 5: Replicate each table
1144    tracing::info!("Step 5/5: Replicating tables...");
1145    for (idx, table_name) in tables.iter().enumerate() {
1146        tracing::info!(
1147            "Replicating table {}/{}: '{}'",
1148            idx + 1,
1149            tables.len(),
1150            table_name
1151        );
1152
1153        // Convert MySQL table to JSONB
1154        let rows =
1155            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1156                .await
1157                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1158
1159        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1160
1161        // Create JSONB table in PostgreSQL
1162        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1163            .await
1164            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1165
1166        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1167
1168        if !rows.is_empty() {
1169            // Batch insert all rows
1170            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1171                .await
1172                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1173
1174            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1175        } else {
1176            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1177        }
1178    }
1179
1180    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1181    tracing::info!(
1182        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1183        tables.len(),
1184        db_name
1185    );
1186
1187    Ok(())
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192    use super::*;
1193
1194    #[tokio::test]
1195    #[ignore]
1196    async fn test_init_replicates_database() {
1197        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1198        let target = std::env::var("TEST_TARGET_URL").unwrap();
1199
1200        // Skip confirmation for automated tests, disable sync to keep test simple
1201        let filter = crate::filters::ReplicationFilter::empty();
1202        let result = init(&source, &target, true, filter, false, false, true, false).await;
1203        assert!(result.is_ok());
1204    }
1205
1206    #[test]
1207    fn test_replace_database_in_url() {
1208        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1209        let result = replace_database_in_url(url, "newdb").unwrap();
1210        assert_eq!(
1211            result,
1212            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1213        );
1214
1215        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1216        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1217        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1218    }
1219
1220    #[tokio::test]
1221    #[ignore]
1222    async fn test_database_is_empty() {
1223        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1224
1225        // Connect to the database first
1226        let client = crate::postgres::connect_with_retry(&url)
1227            .await
1228            .expect("Failed to connect");
1229
1230        // postgres database might be empty of user tables
1231        // This test just verifies the function doesn't crash
1232        let result = database_is_empty(&client).await;
1233        assert!(result.is_ok());
1234    }
1235}