database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Estimate database sizes and get confirmation
344    if !skip_confirmation {
345        tracing::info!("Analyzing database sizes...");
346        let size_estimates = {
347            // Scope the connection so it's dropped after size estimation
348            let source_client = postgres::connect_with_retry(source_url).await?;
349            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350                .await?
351        }; // Connection dropped here
352
353        if !confirm_replication(&size_estimates)? {
354            bail!("Replication cancelled by user");
355        }
356    }
357
358    // Step 4: Replicate each database
359    tracing::info!("Step 4/4: Replicating databases...");
360    for (idx, db_info) in databases.iter().enumerate() {
361        let filtered_tables = filter.predicate_tables(&db_info.name);
362        if checkpoint_state.is_completed(&db_info.name) {
363            tracing::info!(
364                "Skipping database '{}' (already completed per checkpoint)",
365                db_info.name
366            );
367            continue;
368        }
369        tracing::info!(
370            "Replicating database {}/{}: '{}'",
371            idx + 1,
372            databases.len(),
373            db_info.name
374        );
375
376        // Build connection URLs for this specific database
377        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380        // Handle database creation atomically to avoid TOCTOU race condition
381        // Scope the connection so it's dropped before dump/restore subprocess operations
382        {
383            let target_client = postgres::connect_with_retry(target_url).await?;
384
385            // Validate database name to prevent SQL injection
386            crate::utils::validate_postgres_identifier(&db_info.name)
387                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389            // Try to create database atomically (avoids TOCTOU vulnerability)
390            let create_query = format!(
391                "CREATE DATABASE {}",
392                crate::utils::quote_ident(&db_info.name)
393            );
394            match target_client.execute(&create_query, &[]).await {
395                Ok(_) => {
396                    tracing::info!("  Created database '{}'", db_info.name);
397                }
398                Err(err) => {
399                    // Check if error is "database already exists" (error code 42P04)
400                    if let Some(db_error) = err.as_db_error() {
401                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402                            // Database already exists - handle based on user preferences
403                            tracing::info!(
404                                "  Database '{}' already exists on target",
405                                db_info.name
406                            );
407
408                            // Check if empty
409                            if database_is_empty(target_url, &db_info.name).await? {
410                                tracing::info!(
411                                    "  Database '{}' is empty, proceeding with restore",
412                                    db_info.name
413                                );
414                            } else {
415                                // Database exists and has data
416                                let should_drop = if drop_existing {
417                                    // Force drop with --drop-existing flag
418                                    true
419                                } else if skip_confirmation {
420                                    // Auto-confirm drop with --yes flag (non-interactive)
421                                    tracing::info!(
422                                        "  Auto-confirming drop for database '{}' (--yes flag)",
423                                        db_info.name
424                                    );
425                                    true
426                                } else {
427                                    // Interactive mode: prompt user
428                                    prompt_drop_database(&db_info.name)?
429                                };
430
431                                if should_drop {
432                                    drop_database_if_exists(&target_client, &db_info.name).await?;
433
434                                    // Recreate the database
435                                    let create_query = format!(
436                                        "CREATE DATABASE {}",
437                                        crate::utils::quote_ident(&db_info.name)
438                                    );
439                                    target_client
440                                        .execute(&create_query, &[])
441                                        .await
442                                        .with_context(|| {
443                                            format!(
444                                                "Failed to create database '{}' after drop",
445                                                db_info.name
446                                            )
447                                        })?;
448                                    tracing::info!("  Created database '{}'", db_info.name);
449                                } else {
450                                    bail!("Aborted: Database '{}' already exists", db_info.name);
451                                }
452                            }
453                        } else {
454                            // Some other database error - propagate it
455                            return Err(err).with_context(|| {
456                                format!("Failed to create database '{}'", db_info.name)
457                            });
458                        }
459                    } else {
460                        // Not a database error - propagate it
461                        return Err(err).with_context(|| {
462                            format!("Failed to create database '{}'", db_info.name)
463                        });
464                    }
465                }
466            }
467        } // Connection dropped here before dump/restore operations
468
469        // Dump and restore schema
470        tracing::info!("  Dumping schema for '{}'...", db_info.name);
471        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472        migration::dump_schema(
473            &source_db_url,
474            &db_info.name,
475            schema_file.to_str().unwrap(),
476            &filter,
477        )
478        .await?;
479
480        tracing::info!("  Restoring schema for '{}'...", db_info.name);
481        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483        // Dump and restore data (using directory format for parallel operations)
484        tracing::info!("  Dumping data for '{}'...", db_info.name);
485        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486        migration::dump_data(
487            &source_db_url,
488            &db_info.name,
489            data_dir.to_str().unwrap(),
490            &filter,
491        )
492        .await?;
493
494        tracing::info!("  Restoring data for '{}'...", db_info.name);
495        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497        if !filtered_tables.is_empty() {
498            tracing::info!(
499                "  Applying filtered replication for {} table(s)...",
500                filtered_tables.len()
501            );
502            migration::filtered::copy_filtered_tables(
503                &source_db_url,
504                &target_db_url,
505                &filtered_tables,
506            )
507            .await?;
508        }
509
510        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512        checkpoint_state.mark_completed(&db_info.name);
513        checkpoint_state
514            .save(&checkpoint_path)
515            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516    }
517
518    // Explicitly clean up temp directory
519    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
520    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521        tracing::warn!("Failed to clean up temp directory: {}", e);
522        // Don't fail the entire operation if cleanup fails
523    }
524
525    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526        tracing::warn!("Failed to remove checkpoint state: {}", err);
527    }
528
529    tracing::info!("✅ Initial replication complete");
530
531    // Check wal_level before attempting to set up sync
532    let mut should_enable_sync = enable_sync;
533    if enable_sync {
534        tracing::info!("Checking target wal_level for logical replication...");
535        let target_wal_level = {
536            // Scope the connection for quick wal_level check
537            let target_client = postgres::connect_with_retry(target_url).await?;
538            postgres::check_wal_level(&target_client).await?
539        }; // Connection dropped here
540
541        if target_wal_level != "logical" {
542            tracing::warn!("");
543            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
545            tracing::warn!("");
546            tracing::warn!("  To fix this:");
547            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
548            tracing::warn!("    2. Restart PostgreSQL server");
549            tracing::warn!(
550                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551            );
552            tracing::warn!("");
553            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554            should_enable_sync = false;
555        }
556    }
557
558    // Set up continuous logical replication if enabled
559    if should_enable_sync {
560        tracing::info!("");
561        tracing::info!("========================================");
562        tracing::info!("Step 5/5: Setting up continuous replication...");
563        tracing::info!("========================================");
564        tracing::info!("");
565
566        // Call sync command with the same filter
567        crate::commands::sync(
568            source_url,
569            target_url,
570            Some(filter),
571            None,
572            None,
573            None,
574            false,
575        )
576        .await
577        .context("Failed to set up continuous replication")?;
578
579        tracing::info!("");
580        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581    } else {
582        tracing::info!("");
583        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584        tracing::info!("  To enable it later, run:");
585        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
586    }
587
588    Ok(())
589}
590
591/// Replace the database name in a connection URL
592fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593    // Parse URL to find database name
594    // Format: postgresql://user:pass@host:port/database?params
595
596    // Split by '?' to separate params
597    let parts: Vec<&str> = url.split('?').collect();
598    let base_url = parts[0];
599    let params = if parts.len() > 1 {
600        Some(parts[1])
601    } else {
602        None
603    };
604
605    // Split base by '/' to get everything before database name
606    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607    if url_parts.len() != 2 {
608        anyhow::bail!("Invalid connection URL format");
609    }
610
611    // Reconstruct URL with new database name
612    let mut new_url = format!("{}/{}", url_parts[1], new_database);
613    if let Some(p) = params {
614        new_url = format!("{}?{}", new_url, p);
615    }
616
617    Ok(new_url)
618}
619
620/// Display database size estimates and prompt for confirmation
621///
622/// Shows a table with database names, sizes, and estimated replication times.
623/// Prompts the user to proceed with the replication.
624///
625/// # Arguments
626///
627/// * `sizes` - Vector of database size estimates
628///
629/// # Returns
630///
631/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
632///
633/// # Errors
634///
635/// Returns an error if stdin/stdout operations fail.
636fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637    use std::time::Duration;
638
639    // Calculate totals
640    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643    // Print table header
644    println!();
645    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646    println!("{}", "─".repeat(50));
647
648    // Print each database
649    for size in sizes {
650        println!(
651            "{:<20} {:<12} {:<15}",
652            size.name,
653            size.size_human,
654            migration::format_duration(size.estimated_duration)
655        );
656    }
657
658    // Print totals
659    println!("{}", "─".repeat(50));
660    println!(
661        "Total: {} (estimated {})",
662        migration::format_bytes(total_bytes),
663        migration::format_duration(total_duration)
664    );
665    println!();
666
667    // Prompt for confirmation
668    print!("Proceed with replication? [y/N]: ");
669    io::stdout().flush()?;
670
671    let mut input = String::new();
672    io::stdin()
673        .read_line(&mut input)
674        .context("Failed to read user input")?;
675
676    Ok(input.trim().to_lowercase() == "y")
677}
678
679/// Checks if a database is empty (no user tables)
680async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
681    // Need to connect to the specific database to check tables
682    let db_url = replace_database_in_url(target_url, db_name)?;
683    let client = postgres::connect_with_retry(&db_url).await?;
684
685    let query = "
686        SELECT COUNT(*)
687        FROM information_schema.tables
688        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
689    ";
690
691    let row = client.query_one(query, &[]).await?;
692    let count: i64 = row.get(0);
693
694    Ok(count == 0)
695}
696
697/// Prompts user to drop existing database
698fn prompt_drop_database(db_name: &str) -> Result<bool> {
699    use std::io::{self, Write};
700
701    print!(
702        "\nWarning: Database '{}' already exists on target and contains data.\n\
703         Drop and recreate database? This will delete all existing data. [y/N]: ",
704        db_name
705    );
706    io::stdout().flush()?;
707
708    let mut input = String::new();
709    io::stdin().read_line(&mut input)?;
710
711    Ok(input.trim().eq_ignore_ascii_case("y"))
712}
713
714/// Drops a database if it exists
715async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
716    // Validate database name to prevent SQL injection
717    crate::utils::validate_postgres_identifier(db_name)
718        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
719
720    tracing::info!("  Dropping existing database '{}'...", db_name);
721
722    // Terminate existing connections to the database
723    let terminate_query = "
724        SELECT pg_terminate_backend(pid)
725        FROM pg_stat_activity
726        WHERE datname = $1 AND pid <> pg_backend_pid()
727    ";
728    target_conn.execute(terminate_query, &[&db_name]).await?;
729
730    // Drop the database
731    let drop_query = format!(
732        "DROP DATABASE IF EXISTS {}",
733        crate::utils::quote_ident(db_name)
734    );
735    target_conn
736        .execute(&drop_query, &[])
737        .await
738        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
739
740    tracing::info!("  ✓ Database '{}' dropped", db_name);
741    Ok(())
742}
743
744/// Initial replication from SQLite to PostgreSQL
745///
746/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
747/// 1. Validates SQLite file exists and is readable
748/// 2. Validates PostgreSQL target connection
749/// 3. Lists all tables from SQLite database
750/// 4. For each table:
751///    - Converts rows to JSONB format
752///    - Creates JSONB table in PostgreSQL
753///    - Batch inserts all data
754///
755/// All SQLite data is stored as JSONB with metadata:
756/// - id: Original row ID (from ID column or row number)
757/// - data: Complete row as JSON object
758/// - _source_type: "sqlite"
759/// - _migrated_at: Timestamp of migration
760///
761/// # Arguments
762///
763/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
764/// * `target_url` - PostgreSQL connection string for target (Seren) database
765///
766/// # Returns
767///
768/// Returns `Ok(())` if migration completes successfully.
769///
770/// # Errors
771///
772/// This function will return an error if:
773/// - SQLite file doesn't exist or isn't readable
774/// - Cannot connect to target PostgreSQL database
775/// - Table conversion fails
776/// - Database creation or insert operations fail
777///
778/// # Examples
779///
780/// ```no_run
781/// # use anyhow::Result;
782/// # use database_replicator::commands::init::init_sqlite_to_postgres;
783/// # async fn example() -> Result<()> {
784/// init_sqlite_to_postgres(
785///     "database.db",
786///     "postgresql://user:pass@seren.example.com/targetdb"
787/// ).await?;
788/// # Ok(())
789/// # }
790/// ```
791pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
792    tracing::info!("Starting SQLite to PostgreSQL migration...");
793
794    // Step 1: Validate SQLite file
795    tracing::info!("Step 1/4: Validating SQLite database...");
796    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
797        .context("SQLite file validation failed")?;
798    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
799
800    // Step 2: Open SQLite connection (read-only)
801    tracing::info!("Step 2/4: Opening SQLite database...");
802    let sqlite_conn =
803        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
804    tracing::info!("  ✓ SQLite database opened (read-only mode)");
805
806    // Step 3: List all tables
807    tracing::info!("Step 3/4: Discovering tables...");
808    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
809        .context("Failed to list tables from SQLite database")?;
810
811    if tables.is_empty() {
812        tracing::warn!("⚠ No tables found in SQLite database");
813        tracing::info!("✅ Migration complete (no tables to migrate)");
814        return Ok(());
815    }
816
817    tracing::info!("Found {} table(s) to migrate", tables.len());
818
819    // Connect to PostgreSQL target
820    let target_client = postgres::connect_with_retry(target_url).await?;
821    tracing::info!("  ✓ Connected to PostgreSQL target");
822
823    // Step 4: Migrate each table
824    tracing::info!("Step 4/4: Migrating tables...");
825    for (idx, table_name) in tables.iter().enumerate() {
826        tracing::info!(
827            "Migrating table {}/{}: '{}'",
828            idx + 1,
829            tables.len(),
830            table_name
831        );
832
833        // Convert SQLite table to JSONB
834        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
835            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
836
837        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
838
839        // Create JSONB table in PostgreSQL
840        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
841            .await
842            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
843
844        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
845
846        if !rows.is_empty() {
847            // Batch insert all rows
848            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
849                .await
850                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
851
852            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
853        } else {
854            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
855        }
856    }
857
858    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
859    tracing::info!(
860        "   Migrated {} table(s) from '{}' to PostgreSQL",
861        tables.len(),
862        sqlite_path
863    );
864
865    Ok(())
866}
867
868/// Initial replication from MongoDB to PostgreSQL
869///
870/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
871/// 1. Validates MongoDB connection string
872/// 2. Connects to MongoDB and verifies connection
873/// 3. Extracts database name from connection string
874/// 4. Lists all collections from MongoDB database
875/// 5. For each collection:
876///    - Converts documents to JSONB format
877///    - Creates JSONB table in PostgreSQL
878///    - Batch inserts all data
879///
880/// All MongoDB data is stored as JSONB with metadata:
881/// - id: Original _id field (ObjectId → hex, String/Int → string)
882/// - data: Complete document as JSON object
883/// - _source_type: "mongodb"
884/// - _migrated_at: Timestamp of migration
885///
886/// # Arguments
887///
888/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
889/// * `target_url` - PostgreSQL connection string for target (Seren) database
890///
891/// # Returns
892///
893/// Returns `Ok(())` if migration completes successfully.
894///
895/// # Errors
896///
897/// This function will return an error if:
898/// - MongoDB connection string is invalid
899/// - Cannot connect to MongoDB database
900/// - Database name is not specified in connection string
901/// - Cannot connect to target PostgreSQL database
902/// - Collection conversion fails
903/// - Database creation or insert operations fail
904///
905/// # Examples
906///
907/// ```no_run
908/// # use anyhow::Result;
909/// # use database_replicator::commands::init::init_mongodb_to_postgres;
910/// # async fn example() -> Result<()> {
911/// init_mongodb_to_postgres(
912///     "mongodb://localhost:27017/mydb",
913///     "postgresql://user:pass@seren.example.com/targetdb"
914/// ).await?;
915/// # Ok(())
916/// # }
917/// ```
918pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
919    tracing::info!("Starting MongoDB to PostgreSQL migration...");
920
921    // Step 1: Validate and connect to MongoDB
922    tracing::info!("Step 1/5: Validating MongoDB connection...");
923    let client = crate::mongodb::connect_mongodb(mongo_url)
924        .await
925        .context("MongoDB connection failed")?;
926    tracing::info!("  ✓ MongoDB connection validated");
927
928    // Step 2: Extract database name
929    tracing::info!("Step 2/5: Extracting database name...");
930    let db_name = crate::mongodb::extract_database_name(mongo_url)
931        .await
932        .context("Failed to parse MongoDB connection string")?
933        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
934    tracing::info!("  ✓ Database name: '{}'", db_name);
935
936    // Step 3: List all collections
937    tracing::info!("Step 3/5: Discovering collections...");
938    let db = client.database(&db_name);
939    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
940        .await
941        .context("Failed to list collections from MongoDB database")?;
942
943    if collections.is_empty() {
944        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
945        tracing::info!("✅ Migration complete (no collections to migrate)");
946        return Ok(());
947    }
948
949    tracing::info!("Found {} collection(s) to migrate", collections.len());
950
951    // Step 4: Connect to PostgreSQL target
952    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
953    let target_client = postgres::connect_with_retry(target_url).await?;
954    tracing::info!("  ✓ Connected to PostgreSQL target");
955
956    // Step 5: Migrate each collection
957    tracing::info!("Step 5/5: Migrating collections...");
958    for (idx, collection_name) in collections.iter().enumerate() {
959        tracing::info!(
960            "Migrating collection {}/{}: '{}'",
961            idx + 1,
962            collections.len(),
963            collection_name
964        );
965
966        // Convert MongoDB collection to JSONB
967        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
968            .await
969            .with_context(|| {
970                format!(
971                    "Failed to convert collection '{}' to JSONB",
972                    collection_name
973                )
974            })?;
975
976        tracing::info!(
977            "  ✓ Converted {} documents from '{}'",
978            rows.len(),
979            collection_name
980        );
981
982        // Create JSONB table in PostgreSQL
983        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
984            .await
985            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
986
987        tracing::info!(
988            "  ✓ Created JSONB table '{}' in PostgreSQL",
989            collection_name
990        );
991
992        if !rows.is_empty() {
993            // Batch insert all rows
994            crate::jsonb::writer::insert_jsonb_batch(
995                &target_client,
996                collection_name,
997                rows,
998                "mongodb",
999            )
1000            .await
1001            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1002
1003            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1004        } else {
1005            tracing::info!(
1006                "  ✓ Collection '{}' is empty (no documents to insert)",
1007                collection_name
1008            );
1009        }
1010    }
1011
1012    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1013    tracing::info!(
1014        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1015        collections.len(),
1016        db_name
1017    );
1018
1019    Ok(())
1020}
1021
1022/// Initial replication from MySQL to PostgreSQL
1023///
1024/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1025/// 1. Validates MySQL connection string
1026/// 2. Connects to MySQL and verifies connection
1027/// 3. Extracts database name from connection string
1028/// 4. Lists all tables from MySQL database
1029/// 5. For each table:
1030///    - Converts rows to JSONB format
1031///    - Creates JSONB table in PostgreSQL
1032///    - Batch inserts all data
1033///
1034/// All MySQL data is stored as JSONB with metadata:
1035/// - id: Primary key or auto-generated ID
1036/// - data: Complete row as JSON object
1037/// - _source_type: "mysql"
1038/// - _migrated_at: Timestamp of replication
1039///
1040/// # Arguments
1041///
1042/// * `mysql_url` - MySQL connection string (mysql://...)
1043/// * `target_url` - PostgreSQL connection string for target (Seren) database
1044///
1045/// # Returns
1046///
1047/// Returns `Ok(())` if replication completes successfully.
1048///
1049/// # Errors
1050///
1051/// This function will return an error if:
1052/// - MySQL connection string is invalid
1053/// - Cannot connect to MySQL database
1054/// - Database name is not specified in connection string
1055/// - Cannot connect to target PostgreSQL database
1056/// - Table conversion fails
1057/// - Database creation or insert operations fail
1058///
1059/// # Examples
1060///
1061/// ```no_run
1062/// # use anyhow::Result;
1063/// # use database_replicator::commands::init::init_mysql_to_postgres;
1064/// # async fn example() -> Result<()> {
1065/// init_mysql_to_postgres(
1066///     "mysql://user:pass@localhost:3306/mydb",
1067///     "postgresql://user:pass@seren.example.com/targetdb"
1068/// ).await?;
1069/// # Ok(())
1070/// # }
1071/// ```
1072pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1073    tracing::info!("Starting MySQL to PostgreSQL replication...");
1074
1075    // Step 1: Validate and connect to MySQL
1076    tracing::info!("Step 1/5: Validating MySQL connection...");
1077    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1078        .await
1079        .context("MySQL connection failed")?;
1080    tracing::info!("  ✓ MySQL connection validated");
1081
1082    // Step 2: Extract database name
1083    tracing::info!("Step 2/5: Extracting database name...");
1084    let db_name = crate::mysql::extract_database_name(mysql_url)
1085        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1086    tracing::info!("  ✓ Database name: '{}'", db_name);
1087
1088    // Step 3: List all tables
1089    tracing::info!("Step 3/5: Discovering tables...");
1090    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1091        .await
1092        .context("Failed to list tables from MySQL database")?;
1093
1094    if tables.is_empty() {
1095        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1096        tracing::info!("✅ Replication complete (no tables to replicate)");
1097        return Ok(());
1098    }
1099
1100    tracing::info!("Found {} table(s) to replicate", tables.len());
1101
1102    // Step 4: Connect to PostgreSQL target
1103    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1104    let target_client = postgres::connect_with_retry(target_url).await?;
1105    tracing::info!("  ✓ Connected to PostgreSQL target");
1106
1107    // Step 5: Replicate each table
1108    tracing::info!("Step 5/5: Replicating tables...");
1109    for (idx, table_name) in tables.iter().enumerate() {
1110        tracing::info!(
1111            "Replicating table {}/{}: '{}'",
1112            idx + 1,
1113            tables.len(),
1114            table_name
1115        );
1116
1117        // Convert MySQL table to JSONB
1118        let rows =
1119            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1120                .await
1121                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1122
1123        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1124
1125        // Create JSONB table in PostgreSQL
1126        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1127            .await
1128            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1129
1130        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1131
1132        if !rows.is_empty() {
1133            // Batch insert all rows
1134            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1135                .await
1136                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1137
1138            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1139        } else {
1140            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1141        }
1142    }
1143
1144    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1145    tracing::info!(
1146        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1147        tables.len(),
1148        db_name
1149    );
1150
1151    Ok(())
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    use super::*;
1157
1158    #[tokio::test]
1159    #[ignore]
1160    async fn test_init_replicates_database() {
1161        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1162        let target = std::env::var("TEST_TARGET_URL").unwrap();
1163
1164        // Skip confirmation for automated tests, disable sync to keep test simple
1165        let filter = crate::filters::ReplicationFilter::empty();
1166        let result = init(&source, &target, true, filter, false, false, true, false).await;
1167        assert!(result.is_ok());
1168    }
1169
1170    #[test]
1171    fn test_replace_database_in_url() {
1172        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1173        let result = replace_database_in_url(url, "newdb").unwrap();
1174        assert_eq!(
1175            result,
1176            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1177        );
1178
1179        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1180        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1181        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1182    }
1183
1184    #[tokio::test]
1185    #[ignore]
1186    async fn test_database_is_empty() {
1187        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1188
1189        // postgres database might be empty of user tables
1190        // This test just verifies the function doesn't crash
1191        let result = database_is_empty(&url, "postgres").await;
1192        assert!(result.is_ok());
1193    }
1194}