database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Estimate database sizes and get confirmation
344    if !skip_confirmation {
345        tracing::info!("Analyzing database sizes...");
346        let size_estimates = {
347            // Scope the connection so it's dropped after size estimation
348            let source_client = postgres::connect_with_retry(source_url).await?;
349            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350                .await?
351        }; // Connection dropped here
352
353        if !confirm_replication(&size_estimates)? {
354            bail!("Replication cancelled by user");
355        }
356    }
357
358    // Step 4: Replicate each database
359    tracing::info!("Step 4/4: Replicating databases...");
360    for (idx, db_info) in databases.iter().enumerate() {
361        let filtered_tables = filter.predicate_tables(&db_info.name);
362        if checkpoint_state.is_completed(&db_info.name) {
363            tracing::info!(
364                "Skipping database '{}' (already completed per checkpoint)",
365                db_info.name
366            );
367            continue;
368        }
369        tracing::info!(
370            "Replicating database {}/{}: '{}'",
371            idx + 1,
372            databases.len(),
373            db_info.name
374        );
375
376        // Build connection URLs for this specific database
377        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380        // Handle database creation atomically to avoid TOCTOU race condition
381        // Scope the connection so it's dropped before dump/restore subprocess operations
382        {
383            let target_client = postgres::connect_with_retry(target_url).await?;
384
385            // Validate database name to prevent SQL injection
386            crate::utils::validate_postgres_identifier(&db_info.name)
387                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389            // Try to create database atomically (avoids TOCTOU vulnerability)
390            let create_query = format!(
391                "CREATE DATABASE {}",
392                crate::utils::quote_ident(&db_info.name)
393            );
394            match target_client.execute(&create_query, &[]).await {
395                Ok(_) => {
396                    tracing::info!("  Created database '{}'", db_info.name);
397                }
398                Err(err) => {
399                    // Check if error is "database already exists" (error code 42P04)
400                    if let Some(db_error) = err.as_db_error() {
401                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402                            // Database already exists - handle based on user preferences
403                            tracing::info!(
404                                "  Database '{}' already exists on target",
405                                db_info.name
406                            );
407
408                            // Check if empty
409                            if database_is_empty(target_url, &db_info.name).await? {
410                                tracing::info!(
411                                    "  Database '{}' is empty, proceeding with restore",
412                                    db_info.name
413                                );
414                            } else {
415                                // Database exists and has data
416                                let should_drop = if drop_existing {
417                                    // Force drop with --drop-existing flag
418                                    true
419                                } else if skip_confirmation {
420                                    // Auto-confirm drop with --yes flag (non-interactive)
421                                    tracing::info!(
422                                        "  Auto-confirming drop for database '{}' (--yes flag)",
423                                        db_info.name
424                                    );
425                                    true
426                                } else {
427                                    // Interactive mode: prompt user
428                                    prompt_drop_database(&db_info.name)?
429                                };
430
431                                if should_drop {
432                                    drop_database_if_exists(&target_client, &db_info.name).await?;
433
434                                    // Recreate the database
435                                    let create_query = format!(
436                                        "CREATE DATABASE {}",
437                                        crate::utils::quote_ident(&db_info.name)
438                                    );
439                                    target_client
440                                        .execute(&create_query, &[])
441                                        .await
442                                        .with_context(|| {
443                                            format!(
444                                                "Failed to create database '{}' after drop",
445                                                db_info.name
446                                            )
447                                        })?;
448                                    tracing::info!("  Created database '{}'", db_info.name);
449                                } else {
450                                    bail!("Aborted: Database '{}' already exists", db_info.name);
451                                }
452                            }
453                        } else {
454                            // Some other database error - propagate it
455                            return Err(err).with_context(|| {
456                                format!("Failed to create database '{}'", db_info.name)
457                            });
458                        }
459                    } else {
460                        // Not a database error - propagate it
461                        return Err(err).with_context(|| {
462                            format!("Failed to create database '{}'", db_info.name)
463                        });
464                    }
465                }
466            }
467        } // Connection dropped here before dump/restore operations
468
469        // Dump and restore schema
470        tracing::info!("  Dumping schema for '{}'...", db_info.name);
471        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472        migration::dump_schema(
473            &source_db_url,
474            &db_info.name,
475            schema_file.to_str().unwrap(),
476            &filter,
477        )
478        .await?;
479
480        tracing::info!("  Restoring schema for '{}'...", db_info.name);
481        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483        // Dump and restore data (using directory format for parallel operations)
484        tracing::info!("  Dumping data for '{}'...", db_info.name);
485        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486        migration::dump_data(
487            &source_db_url,
488            &db_info.name,
489            data_dir.to_str().unwrap(),
490            &filter,
491        )
492        .await?;
493
494        tracing::info!("  Restoring data for '{}'...", db_info.name);
495        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497        if !filtered_tables.is_empty() {
498            tracing::info!(
499                "  Applying filtered replication for {} table(s)...",
500                filtered_tables.len()
501            );
502            migration::filtered::copy_filtered_tables(
503                &source_db_url,
504                &target_db_url,
505                &filtered_tables,
506            )
507            .await?;
508        }
509
510        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512        checkpoint_state.mark_completed(&db_info.name);
513        checkpoint_state
514            .save(&checkpoint_path)
515            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516    }
517
518    // Explicitly clean up temp directory
519    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
520    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521        tracing::warn!("Failed to clean up temp directory: {}", e);
522        // Don't fail the entire operation if cleanup fails
523    }
524
525    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526        tracing::warn!("Failed to remove checkpoint state: {}", err);
527    }
528
529    tracing::info!("✅ Initial replication complete");
530
531    // Check wal_level before attempting to set up sync
532    let mut should_enable_sync = enable_sync;
533    if enable_sync {
534        tracing::info!("Checking target wal_level for logical replication...");
535        let target_wal_level = {
536            // Scope the connection for quick wal_level check
537            let target_client = postgres::connect_with_retry(target_url).await?;
538            postgres::check_wal_level(&target_client).await?
539        }; // Connection dropped here
540
541        if target_wal_level != "logical" {
542            tracing::warn!("");
543            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
545            tracing::warn!("");
546            tracing::warn!("  To fix this:");
547            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
548            tracing::warn!("    2. Restart PostgreSQL server");
549            tracing::warn!(
550                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551            );
552            tracing::warn!("");
553            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554            should_enable_sync = false;
555        }
556    }
557
558    // Set up continuous logical replication if enabled
559    if should_enable_sync {
560        tracing::info!("");
561        tracing::info!("========================================");
562        tracing::info!("Step 5/5: Setting up continuous replication...");
563        tracing::info!("========================================");
564        tracing::info!("");
565
566        // Call sync command with the same filter
567        crate::commands::sync(
568            source_url,
569            target_url,
570            Some(filter),
571            None,
572            None,
573            None,
574            false,
575        )
576        .await
577        .context("Failed to set up continuous replication")?;
578
579        tracing::info!("");
580        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581    } else {
582        tracing::info!("");
583        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584        tracing::info!("  To enable it later, run:");
585        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
586    }
587
588    Ok(())
589}
590
591/// Replace the database name in a connection URL
592fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593    // Parse URL to find database name
594    // Format: postgresql://user:pass@host:port/database?params
595
596    // Split by '?' to separate params
597    let parts: Vec<&str> = url.split('?').collect();
598    let base_url = parts[0];
599    let params = if parts.len() > 1 {
600        Some(parts[1])
601    } else {
602        None
603    };
604
605    // Split base by '/' to get everything before database name
606    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607    if url_parts.len() != 2 {
608        anyhow::bail!("Invalid connection URL format");
609    }
610
611    // Reconstruct URL with new database name
612    let mut new_url = format!("{}/{}", url_parts[1], new_database);
613    if let Some(p) = params {
614        new_url = format!("{}?{}", new_url, p);
615    }
616
617    Ok(new_url)
618}
619
620/// Display database size estimates and prompt for confirmation
621///
622/// Shows a table with database names, sizes, and estimated replication times.
623/// Prompts the user to proceed with the replication.
624///
625/// # Arguments
626///
627/// * `sizes` - Vector of database size estimates
628///
629/// # Returns
630///
631/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
632///
633/// # Errors
634///
635/// Returns an error if stdin/stdout operations fail.
636fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637    use std::time::Duration;
638
639    // Calculate totals
640    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643    // Print table header
644    println!();
645    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646    println!("{}", "─".repeat(50));
647
648    // Print each database
649    for size in sizes {
650        println!(
651            "{:<20} {:<12} {:<15}",
652            size.name,
653            size.size_human,
654            migration::format_duration(size.estimated_duration)
655        );
656    }
657
658    // Print totals
659    println!("{}", "─".repeat(50));
660    println!(
661        "Total: {} (estimated {})",
662        migration::format_bytes(total_bytes),
663        migration::format_duration(total_duration)
664    );
665    println!();
666
667    // Prompt for confirmation
668    print!("Proceed with replication? [y/N]: ");
669    io::stdout().flush()?;
670
671    let mut input = String::new();
672    io::stdin()
673        .read_line(&mut input)
674        .context("Failed to read user input")?;
675
676    Ok(input.trim().to_lowercase() == "y")
677}
678
679/// Checks if a database is empty (no user tables)
680async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
681    // Need to connect to the specific database to check tables
682    let db_url = replace_database_in_url(target_url, db_name)?;
683    let client = postgres::connect_with_retry(&db_url).await?;
684
685    let query = "
686        SELECT COUNT(*)
687        FROM information_schema.tables
688        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
689    ";
690
691    let row = client.query_one(query, &[]).await?;
692    let count: i64 = row.get(0);
693
694    Ok(count == 0)
695}
696
697/// Prompts user to drop existing database
698fn prompt_drop_database(db_name: &str) -> Result<bool> {
699    use std::io::{self, Write};
700
701    print!(
702        "\nWarning: Database '{}' already exists on target and contains data.\n\
703         Drop and recreate database? This will delete all existing data. [y/N]: ",
704        db_name
705    );
706    io::stdout().flush()?;
707
708    let mut input = String::new();
709    io::stdin().read_line(&mut input)?;
710
711    Ok(input.trim().eq_ignore_ascii_case("y"))
712}
713
714/// Drops a database if it exists
715async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
716    // Validate database name to prevent SQL injection
717    crate::utils::validate_postgres_identifier(db_name)
718        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
719
720    tracing::info!("  Dropping existing database '{}'...", db_name);
721
722    // Terminate existing connections to the database
723    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
724    let terminate_query = "
725        SELECT pg_terminate_backend(sa.pid)
726        FROM pg_stat_activity sa
727        JOIN pg_roles r ON sa.usename = r.rolname
728        WHERE sa.datname = $1
729          AND sa.pid <> pg_backend_pid()
730          AND NOT r.rolsuper
731    ";
732    target_conn.execute(terminate_query, &[&db_name]).await?;
733
734    // Drop the database
735    let drop_query = format!(
736        "DROP DATABASE IF EXISTS {}",
737        crate::utils::quote_ident(db_name)
738    );
739    target_conn
740        .execute(&drop_query, &[])
741        .await
742        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
743
744    tracing::info!("  ✓ Database '{}' dropped", db_name);
745    Ok(())
746}
747
748/// Initial replication from SQLite to PostgreSQL
749///
750/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
751/// 1. Validates SQLite file exists and is readable
752/// 2. Validates PostgreSQL target connection
753/// 3. Lists all tables from SQLite database
754/// 4. For each table:
755///    - Converts rows to JSONB format
756///    - Creates JSONB table in PostgreSQL
757///    - Batch inserts all data
758///
759/// All SQLite data is stored as JSONB with metadata:
760/// - id: Original row ID (from ID column or row number)
761/// - data: Complete row as JSON object
762/// - _source_type: "sqlite"
763/// - _migrated_at: Timestamp of migration
764///
765/// # Arguments
766///
767/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
768/// * `target_url` - PostgreSQL connection string for target (Seren) database
769///
770/// # Returns
771///
772/// Returns `Ok(())` if migration completes successfully.
773///
774/// # Errors
775///
776/// This function will return an error if:
777/// - SQLite file doesn't exist or isn't readable
778/// - Cannot connect to target PostgreSQL database
779/// - Table conversion fails
780/// - Database creation or insert operations fail
781///
782/// # Examples
783///
784/// ```no_run
785/// # use anyhow::Result;
786/// # use database_replicator::commands::init::init_sqlite_to_postgres;
787/// # async fn example() -> Result<()> {
788/// init_sqlite_to_postgres(
789///     "database.db",
790///     "postgresql://user:pass@seren.example.com/targetdb"
791/// ).await?;
792/// # Ok(())
793/// # }
794/// ```
795pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
796    tracing::info!("Starting SQLite to PostgreSQL migration...");
797
798    // Step 1: Validate SQLite file
799    tracing::info!("Step 1/4: Validating SQLite database...");
800    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
801        .context("SQLite file validation failed")?;
802    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
803
804    // Step 2: Open SQLite connection (read-only)
805    tracing::info!("Step 2/4: Opening SQLite database...");
806    let sqlite_conn =
807        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
808    tracing::info!("  ✓ SQLite database opened (read-only mode)");
809
810    // Step 3: List all tables
811    tracing::info!("Step 3/4: Discovering tables...");
812    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
813        .context("Failed to list tables from SQLite database")?;
814
815    if tables.is_empty() {
816        tracing::warn!("⚠ No tables found in SQLite database");
817        tracing::info!("✅ Migration complete (no tables to migrate)");
818        return Ok(());
819    }
820
821    tracing::info!("Found {} table(s) to migrate", tables.len());
822
823    // Connect to PostgreSQL target
824    let target_client = postgres::connect_with_retry(target_url).await?;
825    tracing::info!("  ✓ Connected to PostgreSQL target");
826
827    // Step 4: Migrate each table
828    tracing::info!("Step 4/4: Migrating tables...");
829    for (idx, table_name) in tables.iter().enumerate() {
830        tracing::info!(
831            "Migrating table {}/{}: '{}'",
832            idx + 1,
833            tables.len(),
834            table_name
835        );
836
837        // Convert SQLite table to JSONB
838        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
839            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
840
841        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
842
843        // Create JSONB table in PostgreSQL
844        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
845            .await
846            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
847
848        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
849
850        if !rows.is_empty() {
851            // Batch insert all rows
852            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
853                .await
854                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
855
856            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
857        } else {
858            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
859        }
860    }
861
862    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
863    tracing::info!(
864        "   Migrated {} table(s) from '{}' to PostgreSQL",
865        tables.len(),
866        sqlite_path
867    );
868
869    Ok(())
870}
871
872/// Initial replication from MongoDB to PostgreSQL
873///
874/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
875/// 1. Validates MongoDB connection string
876/// 2. Connects to MongoDB and verifies connection
877/// 3. Extracts database name from connection string
878/// 4. Lists all collections from MongoDB database
879/// 5. For each collection:
880///    - Converts documents to JSONB format
881///    - Creates JSONB table in PostgreSQL
882///    - Batch inserts all data
883///
884/// All MongoDB data is stored as JSONB with metadata:
885/// - id: Original _id field (ObjectId → hex, String/Int → string)
886/// - data: Complete document as JSON object
887/// - _source_type: "mongodb"
888/// - _migrated_at: Timestamp of migration
889///
890/// # Arguments
891///
892/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
893/// * `target_url` - PostgreSQL connection string for target (Seren) database
894///
895/// # Returns
896///
897/// Returns `Ok(())` if migration completes successfully.
898///
899/// # Errors
900///
901/// This function will return an error if:
902/// - MongoDB connection string is invalid
903/// - Cannot connect to MongoDB database
904/// - Database name is not specified in connection string
905/// - Cannot connect to target PostgreSQL database
906/// - Collection conversion fails
907/// - Database creation or insert operations fail
908///
909/// # Examples
910///
911/// ```no_run
912/// # use anyhow::Result;
913/// # use database_replicator::commands::init::init_mongodb_to_postgres;
914/// # async fn example() -> Result<()> {
915/// init_mongodb_to_postgres(
916///     "mongodb://localhost:27017/mydb",
917///     "postgresql://user:pass@seren.example.com/targetdb"
918/// ).await?;
919/// # Ok(())
920/// # }
921/// ```
922pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
923    tracing::info!("Starting MongoDB to PostgreSQL migration...");
924
925    // Step 1: Validate and connect to MongoDB
926    tracing::info!("Step 1/5: Validating MongoDB connection...");
927    let client = crate::mongodb::connect_mongodb(mongo_url)
928        .await
929        .context("MongoDB connection failed")?;
930    tracing::info!("  ✓ MongoDB connection validated");
931
932    // Step 2: Extract database name
933    tracing::info!("Step 2/5: Extracting database name...");
934    let db_name = crate::mongodb::extract_database_name(mongo_url)
935        .await
936        .context("Failed to parse MongoDB connection string")?
937        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
938    tracing::info!("  ✓ Database name: '{}'", db_name);
939
940    // Step 3: List all collections
941    tracing::info!("Step 3/5: Discovering collections...");
942    let db = client.database(&db_name);
943    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
944        .await
945        .context("Failed to list collections from MongoDB database")?;
946
947    if collections.is_empty() {
948        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
949        tracing::info!("✅ Migration complete (no collections to migrate)");
950        return Ok(());
951    }
952
953    tracing::info!("Found {} collection(s) to migrate", collections.len());
954
955    // Step 4: Connect to PostgreSQL target
956    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
957    let target_client = postgres::connect_with_retry(target_url).await?;
958    tracing::info!("  ✓ Connected to PostgreSQL target");
959
960    // Step 5: Migrate each collection
961    tracing::info!("Step 5/5: Migrating collections...");
962    for (idx, collection_name) in collections.iter().enumerate() {
963        tracing::info!(
964            "Migrating collection {}/{}: '{}'",
965            idx + 1,
966            collections.len(),
967            collection_name
968        );
969
970        // Convert MongoDB collection to JSONB
971        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
972            .await
973            .with_context(|| {
974                format!(
975                    "Failed to convert collection '{}' to JSONB",
976                    collection_name
977                )
978            })?;
979
980        tracing::info!(
981            "  ✓ Converted {} documents from '{}'",
982            rows.len(),
983            collection_name
984        );
985
986        // Create JSONB table in PostgreSQL
987        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
988            .await
989            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
990
991        tracing::info!(
992            "  ✓ Created JSONB table '{}' in PostgreSQL",
993            collection_name
994        );
995
996        if !rows.is_empty() {
997            // Batch insert all rows
998            crate::jsonb::writer::insert_jsonb_batch(
999                &target_client,
1000                collection_name,
1001                rows,
1002                "mongodb",
1003            )
1004            .await
1005            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1006
1007            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1008        } else {
1009            tracing::info!(
1010                "  ✓ Collection '{}' is empty (no documents to insert)",
1011                collection_name
1012            );
1013        }
1014    }
1015
1016    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1017    tracing::info!(
1018        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1019        collections.len(),
1020        db_name
1021    );
1022
1023    Ok(())
1024}
1025
1026/// Initial replication from MySQL to PostgreSQL
1027///
1028/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1029/// 1. Validates MySQL connection string
1030/// 2. Connects to MySQL and verifies connection
1031/// 3. Extracts database name from connection string
1032/// 4. Lists all tables from MySQL database
1033/// 5. For each table:
1034///    - Converts rows to JSONB format
1035///    - Creates JSONB table in PostgreSQL
1036///    - Batch inserts all data
1037///
1038/// All MySQL data is stored as JSONB with metadata:
1039/// - id: Primary key or auto-generated ID
1040/// - data: Complete row as JSON object
1041/// - _source_type: "mysql"
1042/// - _migrated_at: Timestamp of replication
1043///
1044/// # Arguments
1045///
1046/// * `mysql_url` - MySQL connection string (mysql://...)
1047/// * `target_url` - PostgreSQL connection string for target (Seren) database
1048///
1049/// # Returns
1050///
1051/// Returns `Ok(())` if replication completes successfully.
1052///
1053/// # Errors
1054///
1055/// This function will return an error if:
1056/// - MySQL connection string is invalid
1057/// - Cannot connect to MySQL database
1058/// - Database name is not specified in connection string
1059/// - Cannot connect to target PostgreSQL database
1060/// - Table conversion fails
1061/// - Database creation or insert operations fail
1062///
1063/// # Examples
1064///
1065/// ```no_run
1066/// # use anyhow::Result;
1067/// # use database_replicator::commands::init::init_mysql_to_postgres;
1068/// # async fn example() -> Result<()> {
1069/// init_mysql_to_postgres(
1070///     "mysql://user:pass@localhost:3306/mydb",
1071///     "postgresql://user:pass@seren.example.com/targetdb"
1072/// ).await?;
1073/// # Ok(())
1074/// # }
1075/// ```
1076pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1077    tracing::info!("Starting MySQL to PostgreSQL replication...");
1078
1079    // Step 1: Validate and connect to MySQL
1080    tracing::info!("Step 1/5: Validating MySQL connection...");
1081    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1082        .await
1083        .context("MySQL connection failed")?;
1084    tracing::info!("  ✓ MySQL connection validated");
1085
1086    // Step 2: Extract database name
1087    tracing::info!("Step 2/5: Extracting database name...");
1088    let db_name = crate::mysql::extract_database_name(mysql_url)
1089        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1090    tracing::info!("  ✓ Database name: '{}'", db_name);
1091
1092    // Step 3: List all tables
1093    tracing::info!("Step 3/5: Discovering tables...");
1094    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1095        .await
1096        .context("Failed to list tables from MySQL database")?;
1097
1098    if tables.is_empty() {
1099        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1100        tracing::info!("✅ Replication complete (no tables to replicate)");
1101        return Ok(());
1102    }
1103
1104    tracing::info!("Found {} table(s) to replicate", tables.len());
1105
1106    // Step 4: Connect to PostgreSQL target
1107    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1108    let target_client = postgres::connect_with_retry(target_url).await?;
1109    tracing::info!("  ✓ Connected to PostgreSQL target");
1110
1111    // Step 5: Replicate each table
1112    tracing::info!("Step 5/5: Replicating tables...");
1113    for (idx, table_name) in tables.iter().enumerate() {
1114        tracing::info!(
1115            "Replicating table {}/{}: '{}'",
1116            idx + 1,
1117            tables.len(),
1118            table_name
1119        );
1120
1121        // Convert MySQL table to JSONB
1122        let rows =
1123            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1124                .await
1125                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1126
1127        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1128
1129        // Create JSONB table in PostgreSQL
1130        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1131            .await
1132            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1133
1134        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1135
1136        if !rows.is_empty() {
1137            // Batch insert all rows
1138            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1139                .await
1140                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1141
1142            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1143        } else {
1144            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1145        }
1146    }
1147
1148    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1149    tracing::info!(
1150        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1151        tables.len(),
1152        db_name
1153    );
1154
1155    Ok(())
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161
1162    #[tokio::test]
1163    #[ignore]
1164    async fn test_init_replicates_database() {
1165        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1166        let target = std::env::var("TEST_TARGET_URL").unwrap();
1167
1168        // Skip confirmation for automated tests, disable sync to keep test simple
1169        let filter = crate::filters::ReplicationFilter::empty();
1170        let result = init(&source, &target, true, filter, false, false, true, false).await;
1171        assert!(result.is_ok());
1172    }
1173
1174    #[test]
1175    fn test_replace_database_in_url() {
1176        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1177        let result = replace_database_in_url(url, "newdb").unwrap();
1178        assert_eq!(
1179            result,
1180            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1181        );
1182
1183        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1184        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1185        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1186    }
1187
1188    #[tokio::test]
1189    #[ignore]
1190    async fn test_database_is_empty() {
1191        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1192
1193        // postgres database might be empty of user tables
1194        // This test just verifies the function doesn't crash
1195        let result = database_is_empty(&url, "postgres").await;
1196        assert!(result.is_ok());
1197    }
1198}