database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Check if user specified a target database name that doesn't match source databases
344    // This is important because init preserves source database names during replication
345    if let Ok(target_parts) = crate::utils::parse_postgres_url(target_url) {
346        let target_db = &target_parts.database;
347        let source_db_names: Vec<&str> = databases.iter().map(|db| db.name.as_str()).collect();
348
349        if !source_db_names.contains(&target_db.as_str())
350            && target_db != "postgres"
351            && !target_db.is_empty()
352        {
353            println!();
354            println!("========================================");
355            println!("⚠️  IMPORTANT: Target database name ignored");
356            println!("========================================");
357            println!();
358            println!(
359                "You specified target database '{}', but replication preserves",
360                target_db
361            );
362            println!("source database names. Data will be replicated to:");
363            println!();
364            for db_name in &source_db_names {
365                println!("  → {}", db_name);
366            }
367            println!();
368            println!(
369                "The '{}' database in your target URL is used only for",
370                target_db
371            );
372            println!("the initial connection. Source databases will be created");
373            println!("on the target server with their original names.");
374            println!();
375            println!("When running 'sync' later, use the correct database name:");
376            if let Some(first_db) = source_db_names.first() {
377                println!("  --target \"postgresql://.../{}\"\n", first_db);
378            }
379            println!("========================================");
380            println!();
381        }
382    }
383
384    // Estimate database sizes and get confirmation
385    if !skip_confirmation {
386        tracing::info!("Analyzing database sizes...");
387        let size_estimates = {
388            // Scope the connection so it's dropped after size estimation
389            let source_client = postgres::connect_with_retry(source_url).await?;
390            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
391                .await?
392        }; // Connection dropped here
393
394        if !confirm_replication(&size_estimates)? {
395            bail!("Replication cancelled by user");
396        }
397    }
398
399    // Step 4: Replicate each database
400    tracing::info!("Step 4/4: Replicating databases...");
401    for (idx, db_info) in databases.iter().enumerate() {
402        let filtered_tables = filter.predicate_tables(&db_info.name);
403        if checkpoint_state.is_completed(&db_info.name) {
404            tracing::info!(
405                "Skipping database '{}' (already completed per checkpoint)",
406                db_info.name
407            );
408            continue;
409        }
410        tracing::info!(
411            "Replicating database {}/{}: '{}'",
412            idx + 1,
413            databases.len(),
414            db_info.name
415        );
416
417        // Build connection URLs for this specific database
418        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
419        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
420
421        // Handle database creation atomically to avoid TOCTOU race condition
422        // Scope the connection so it's dropped before dump/restore subprocess operations
423        {
424            let target_client = postgres::connect_with_retry(target_url).await?;
425
426            // Validate database name to prevent SQL injection
427            crate::utils::validate_postgres_identifier(&db_info.name)
428                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
429
430            // Try to create database atomically (avoids TOCTOU vulnerability)
431            let create_query = format!(
432                "CREATE DATABASE {}",
433                crate::utils::quote_ident(&db_info.name)
434            );
435            match target_client.execute(&create_query, &[]).await {
436                Ok(_) => {
437                    tracing::info!("  Created database '{}'", db_info.name);
438                }
439                Err(err) => {
440                    // Check if error is "database already exists" (error code 42P04)
441                    if let Some(db_error) = err.as_db_error() {
442                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
443                            // Database already exists - handle based on user preferences
444                            tracing::info!(
445                                "  Database '{}' already exists on target",
446                                db_info.name
447                            );
448
449                            // Check if empty by connecting to the SPECIFIC database
450                            // (target_client is connected to default db, not the target db)
451                            let is_empty = {
452                                let db_client =
453                                    postgres::connect_with_retry(&target_db_url).await?;
454                                database_is_empty(&db_client).await?
455                            }; // db_client dropped here
456
457                            if is_empty {
458                                tracing::info!(
459                                    "  Database '{}' is empty, proceeding with restore",
460                                    db_info.name
461                                );
462                            } else {
463                                // Database exists and has data
464                                let should_drop = if drop_existing {
465                                    // Force drop with --drop-existing flag
466                                    true
467                                } else if skip_confirmation {
468                                    // Auto-confirm drop with --yes flag (non-interactive)
469                                    tracing::info!(
470                                        "  Auto-confirming drop for database '{}' (--yes flag)",
471                                        db_info.name
472                                    );
473                                    true
474                                } else {
475                                    // Interactive mode: prompt user
476                                    prompt_drop_database(&db_info.name)?
477                                };
478
479                                if should_drop {
480                                    drop_database_if_exists(&target_client, &db_info.name).await?;
481
482                                    // Recreate the database
483                                    let create_query = format!(
484                                        "CREATE DATABASE {}",
485                                        crate::utils::quote_ident(&db_info.name)
486                                    );
487                                    target_client
488                                        .execute(&create_query, &[])
489                                        .await
490                                        .with_context(|| {
491                                            format!(
492                                                "Failed to create database '{}' after drop",
493                                                db_info.name
494                                            )
495                                        })?;
496                                    tracing::info!("  Created database '{}'", db_info.name);
497                                } else {
498                                    bail!("Aborted: Database '{}' already exists", db_info.name);
499                                }
500                            }
501                        } else {
502                            // Some other database error - propagate it
503                            return Err(err).with_context(|| {
504                                format!("Failed to create database '{}'", db_info.name)
505                            });
506                        }
507                    } else {
508                        // Not a database error - propagate it
509                        return Err(err).with_context(|| {
510                            format!("Failed to create database '{}'", db_info.name)
511                        });
512                    }
513                }
514            }
515        } // Connection dropped here before dump/restore operations
516
517        // Dump and restore schema
518        tracing::info!("  Dumping schema for '{}'...", db_info.name);
519        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
520        migration::dump_schema(
521            &source_db_url,
522            &db_info.name,
523            schema_file.to_str().unwrap(),
524            &filter,
525        )
526        .await?;
527
528        tracing::info!("  Restoring schema for '{}'...", db_info.name);
529        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
530
531        // Dump and restore data (using directory format for parallel operations)
532        tracing::info!("  Dumping data for '{}'...", db_info.name);
533        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
534        migration::dump_data(
535            &source_db_url,
536            &db_info.name,
537            data_dir.to_str().unwrap(),
538            &filter,
539        )
540        .await?;
541
542        tracing::info!("  Restoring data for '{}'...", db_info.name);
543        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
544
545        if !filtered_tables.is_empty() {
546            tracing::info!(
547                "  Applying filtered replication for {} table(s)...",
548                filtered_tables.len()
549            );
550            migration::filtered::copy_filtered_tables(
551                &source_db_url,
552                &target_db_url,
553                &filtered_tables,
554            )
555            .await?;
556        }
557
558        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
559
560        checkpoint_state.mark_completed(&db_info.name);
561        checkpoint_state
562            .save(&checkpoint_path)
563            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
564    }
565
566    // Explicitly clean up temp directory
567    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
568    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
569        tracing::warn!("Failed to clean up temp directory: {}", e);
570        // Don't fail the entire operation if cleanup fails
571    }
572
573    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
574        tracing::warn!("Failed to remove checkpoint state: {}", err);
575    }
576
577    tracing::info!("✅ Initial replication complete");
578
579    // Check wal_level before attempting to set up sync
580    let mut should_enable_sync = enable_sync;
581    if enable_sync {
582        tracing::info!("Checking target wal_level for logical replication...");
583        let target_wal_level = {
584            // Scope the connection for quick wal_level check
585            let target_client = postgres::connect_with_retry(target_url).await?;
586            postgres::check_wal_level(&target_client).await?
587        }; // Connection dropped here
588
589        if target_wal_level != "logical" {
590            tracing::warn!("");
591            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
592            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
593            tracing::warn!("");
594            tracing::warn!("  To fix this:");
595            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
596            tracing::warn!("    2. Restart PostgreSQL server");
597            tracing::warn!(
598                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
599            );
600            tracing::warn!("");
601            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
602            should_enable_sync = false;
603        }
604    }
605
606    // Set up continuous logical replication if enabled
607    if should_enable_sync {
608        tracing::info!("");
609        tracing::info!("========================================");
610        tracing::info!("Step 5/5: Setting up continuous replication...");
611        tracing::info!("========================================");
612        tracing::info!("");
613
614        // Call sync command with the same filter
615        crate::commands::sync(
616            source_url,
617            target_url,
618            Some(filter),
619            None,
620            None,
621            None,
622            false,
623        )
624        .await
625        .context("Failed to set up continuous replication")?;
626
627        tracing::info!("");
628        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
629    } else {
630        tracing::info!("");
631        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
632        tracing::info!("  To enable it later, run:");
633        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
634    }
635
636    Ok(())
637}
638
639/// Replace the database name in a connection URL
640fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
641    // Parse URL to find database name
642    // Format: postgresql://user:pass@host:port/database?params
643
644    // Split by '?' to separate params
645    let parts: Vec<&str> = url.split('?').collect();
646    let base_url = parts[0];
647    let params = if parts.len() > 1 {
648        Some(parts[1])
649    } else {
650        None
651    };
652
653    // Split base by '/' to get everything before database name
654    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
655    if url_parts.len() != 2 {
656        anyhow::bail!("Invalid connection URL format");
657    }
658
659    // Reconstruct URL with new database name
660    let mut new_url = format!("{}/{}", url_parts[1], new_database);
661    if let Some(p) = params {
662        new_url = format!("{}?{}", new_url, p);
663    }
664
665    Ok(new_url)
666}
667
668/// Display database size estimates and prompt for confirmation
669///
670/// Shows a table with database names, sizes, and estimated replication times.
671/// Prompts the user to proceed with the replication.
672///
673/// # Arguments
674///
675/// * `sizes` - Vector of database size estimates
676///
677/// # Returns
678///
679/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
680///
681/// # Errors
682///
683/// Returns an error if stdin/stdout operations fail.
684fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
685    use std::time::Duration;
686
687    // Calculate totals
688    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
689    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
690
691    // Print table header
692    println!();
693    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
694    println!("{}", "─".repeat(50));
695
696    // Print each database
697    for size in sizes {
698        println!(
699            "{:<20} {:<12} {:<15}",
700            size.name,
701            size.size_human,
702            migration::format_duration(size.estimated_duration)
703        );
704    }
705
706    // Print totals
707    println!("{}", "─".repeat(50));
708    println!(
709        "Total: {} (estimated {})",
710        migration::format_bytes(total_bytes),
711        migration::format_duration(total_duration)
712    );
713    println!();
714
715    // Prompt for confirmation
716    print!("Proceed with replication? [y/N]: ");
717    io::stdout().flush()?;
718
719    let mut input = String::new();
720    io::stdin()
721        .read_line(&mut input)
722        .context("Failed to read user input")?;
723
724    Ok(input.trim().to_lowercase() == "y")
725}
726
727/// Checks if the currently connected database is empty (has no user tables).
728///
729/// Includes a 30-second timeout to prevent hanging on stale serverless connections.
730async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
731    let query = "
732        SELECT COUNT(*)
733        FROM information_schema.tables
734        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
735    ";
736
737    // Add timeout to prevent indefinite hang on stale serverless connections
738    let row = tokio::time::timeout(
739        std::time::Duration::from_secs(30),
740        client.query_one(query, &[]),
741    )
742    .await
743    .context("database_is_empty query timed out after 30 seconds")?
744    .context("Failed to query information_schema.tables")?;
745
746    let count: i64 = row.get(0);
747
748    Ok(count == 0)
749}
750
751/// Prompts user to drop existing database
752fn prompt_drop_database(db_name: &str) -> Result<bool> {
753    use std::io::{self, Write};
754
755    print!(
756        "\nWarning: Database '{}' already exists on target and contains data.\n\
757         Drop and recreate database? This will delete all existing data. [y/N]: ",
758        db_name
759    );
760    io::stdout().flush()?;
761
762    let mut input = String::new();
763    io::stdin().read_line(&mut input)?;
764
765    Ok(input.trim().eq_ignore_ascii_case("y"))
766}
767
768/// Drops a database if it exists
769async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
770    // Validate database name to prevent SQL injection
771    crate::utils::validate_postgres_identifier(db_name)
772        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
773
774    tracing::info!("  Dropping existing database '{}'...", db_name);
775
776    // Terminate existing connections to the database
777    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
778    let terminate_query = "
779        SELECT pg_terminate_backend(sa.pid)
780        FROM pg_stat_activity sa
781        JOIN pg_roles r ON sa.usename = r.rolname
782        WHERE sa.datname = $1
783          AND sa.pid <> pg_backend_pid()
784          AND NOT r.rolsuper
785    ";
786    target_conn.execute(terminate_query, &[&db_name]).await?;
787
788    // Check if any connections remain (including SUPERUSER connections we couldn't terminate)
789    let remaining_query = "
790        SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
791        FROM pg_stat_activity sa
792        WHERE sa.datname = $1
793          AND sa.pid <> pg_backend_pid()
794    ";
795    let row = target_conn
796        .query_one(remaining_query, &[&db_name])
797        .await
798        .context("Failed to check remaining connections")?;
799    let remaining_count: i64 = row.get(0);
800    let remaining_users: Option<String> = row.get(1);
801
802    if remaining_count > 0 {
803        let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
804        bail!(
805            "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
806             These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
807             This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
808             processes maintain superuser connections.\n\n\
809             To resolve this:\n\
810             1. Wait a few minutes and retry (system connections may be temporary)\n\
811             2. Ask your database administrator to terminate the blocking sessions:\n\
812                SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
813             3. If using AWS RDS, check for RDS-managed connections in the RDS console",
814            db_name,
815            remaining_count,
816            users,
817            db_name
818        );
819    }
820
821    // Drop the database
822    let drop_query = format!(
823        "DROP DATABASE IF EXISTS {}",
824        crate::utils::quote_ident(db_name)
825    );
826    target_conn
827        .execute(&drop_query, &[])
828        .await
829        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
830
831    tracing::info!("  ✓ Database '{}' dropped", db_name);
832    Ok(())
833}
834
835/// Initial replication from SQLite to PostgreSQL
836///
837/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
838/// 1. Validates SQLite file exists and is readable
839/// 2. Validates PostgreSQL target connection
840/// 3. Lists all tables from SQLite database
841/// 4. For each table:
842///    - Converts rows to JSONB format
843///    - Creates JSONB table in PostgreSQL
844///    - Batch inserts all data
845///
846/// All SQLite data is stored as JSONB with metadata:
847/// - id: Original row ID (from ID column or row number)
848/// - data: Complete row as JSON object
849/// - _source_type: "sqlite"
850/// - _migrated_at: Timestamp of migration
851///
852/// # Arguments
853///
854/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
855/// * `target_url` - PostgreSQL connection string for target (Seren) database
856///
857/// # Returns
858///
859/// Returns `Ok(())` if migration completes successfully.
860///
861/// # Errors
862///
863/// This function will return an error if:
864/// - SQLite file doesn't exist or isn't readable
865/// - Cannot connect to target PostgreSQL database
866/// - Table conversion fails
867/// - Database creation or insert operations fail
868///
869/// # Examples
870///
871/// ```no_run
872/// # use anyhow::Result;
873/// # use database_replicator::commands::init::init_sqlite_to_postgres;
874/// # async fn example() -> Result<()> {
875/// init_sqlite_to_postgres(
876///     "database.db",
877///     "postgresql://user:pass@seren.example.com/targetdb"
878/// ).await?;
879/// # Ok(())
880/// # }
881/// ```
882pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
883    tracing::info!("Starting SQLite to PostgreSQL migration...");
884
885    // Step 1: Validate SQLite file
886    tracing::info!("Step 1/4: Validating SQLite database...");
887    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
888        .context("SQLite file validation failed")?;
889    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
890
891    // Step 2: Open SQLite connection (read-only)
892    tracing::info!("Step 2/4: Opening SQLite database...");
893    let sqlite_conn =
894        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
895    tracing::info!("  ✓ SQLite database opened (read-only mode)");
896
897    // Step 3: List all tables
898    tracing::info!("Step 3/4: Discovering tables...");
899    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
900        .context("Failed to list tables from SQLite database")?;
901
902    if tables.is_empty() {
903        tracing::warn!("⚠ No tables found in SQLite database");
904        tracing::info!("✅ Migration complete (no tables to migrate)");
905        return Ok(());
906    }
907
908    tracing::info!("Found {} table(s) to migrate", tables.len());
909
910    // Connect to PostgreSQL target
911    let target_client = postgres::connect_with_retry(target_url).await?;
912    tracing::info!("  ✓ Connected to PostgreSQL target");
913
914    // Get row counts for progress display
915    let mut table_row_counts: Vec<(&str, usize)> = Vec::new();
916    let mut total_rows = 0usize;
917    for table_name in &tables {
918        let count =
919            crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0);
920        table_row_counts.push((table_name, count));
921        total_rows += count;
922    }
923
924    tracing::info!(
925        "Total rows to migrate: {} across {} table(s)",
926        total_rows,
927        tables.len()
928    );
929
930    // Step 4: Migrate each table using batched processing
931    tracing::info!("Step 4/4: Migrating tables (batched processing)...");
932    let mut migrated_rows = 0usize;
933
934    for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() {
935        tracing::info!(
936            "Migrating table {}/{}: '{}' ({} rows)",
937            idx + 1,
938            tables.len(),
939            table_name,
940            row_count
941        );
942
943        // Create JSONB table in PostgreSQL
944        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
945            .await
946            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
947
948        // Truncate existing data to make init idempotent (fixes #69)
949        crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
950            .await
951            .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
952
953        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
954
955        // Use batched conversion for memory efficiency
956        let rows_processed = crate::sqlite::converter::convert_table_batched(
957            &sqlite_conn,
958            &target_client,
959            table_name,
960            "sqlite",
961            None, // Use default batch size
962        )
963        .await
964        .with_context(|| format!("Failed to migrate table '{}'", table_name))?;
965
966        migrated_rows += rows_processed;
967
968        if rows_processed > 0 {
969            tracing::info!(
970                "  ✓ Migrated {} rows from '{}' ({:.1}% of total)",
971                rows_processed,
972                table_name,
973                if total_rows > 0 {
974                    migrated_rows as f64 / total_rows as f64 * 100.0
975                } else {
976                    100.0
977                }
978            );
979        } else {
980            tracing::info!("  ✓ Table '{}' is empty (no rows to migrate)", table_name);
981        }
982    }
983
984    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
985    tracing::info!(
986        "   Migrated {} row(s) from {} table(s) in '{}'",
987        migrated_rows,
988        tables.len(),
989        sqlite_path
990    );
991
992    Ok(())
993}
994
995/// Initial replication from MongoDB to PostgreSQL
996///
997/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
998/// 1. Validates MongoDB connection string
999/// 2. Connects to MongoDB and verifies connection
1000/// 3. Extracts database name from connection string
1001/// 4. Lists all collections from MongoDB database
1002/// 5. For each collection:
1003///    - Converts documents to JSONB format
1004///    - Creates JSONB table in PostgreSQL
1005///    - Batch inserts all data
1006///
1007/// All MongoDB data is stored as JSONB with metadata:
1008/// - id: Original _id field (ObjectId → hex, String/Int → string)
1009/// - data: Complete document as JSON object
1010/// - _source_type: "mongodb"
1011/// - _migrated_at: Timestamp of migration
1012///
1013/// # Arguments
1014///
1015/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
1016/// * `target_url` - PostgreSQL connection string for target (Seren) database
1017///
1018/// # Returns
1019///
1020/// Returns `Ok(())` if migration completes successfully.
1021///
1022/// # Errors
1023///
1024/// This function will return an error if:
1025/// - MongoDB connection string is invalid
1026/// - Cannot connect to MongoDB database
1027/// - Database name is not specified in connection string
1028/// - Cannot connect to target PostgreSQL database
1029/// - Collection conversion fails
1030/// - Database creation or insert operations fail
1031///
1032/// # Examples
1033///
1034/// ```no_run
1035/// # use anyhow::Result;
1036/// # use database_replicator::commands::init::init_mongodb_to_postgres;
1037/// # async fn example() -> Result<()> {
1038/// init_mongodb_to_postgres(
1039///     "mongodb://localhost:27017/mydb",
1040///     "postgresql://user:pass@seren.example.com/targetdb"
1041/// ).await?;
1042/// # Ok(())
1043/// # }
1044/// ```
1045pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
1046    tracing::info!("Starting MongoDB to PostgreSQL migration...");
1047
1048    // Step 1: Validate and connect to MongoDB
1049    tracing::info!("Step 1/5: Validating MongoDB connection...");
1050    let client = crate::mongodb::connect_mongodb(mongo_url)
1051        .await
1052        .context("MongoDB connection failed")?;
1053    tracing::info!("  ✓ MongoDB connection validated");
1054
1055    // Step 2: Extract database name
1056    tracing::info!("Step 2/5: Extracting database name...");
1057    let db_name = crate::mongodb::extract_database_name(mongo_url)
1058        .await
1059        .context("Failed to parse MongoDB connection string")?
1060        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
1061    tracing::info!("  ✓ Database name: '{}'", db_name);
1062
1063    // Step 3: List all collections
1064    tracing::info!("Step 3/5: Discovering collections...");
1065    let db = client.database(&db_name);
1066    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
1067        .await
1068        .context("Failed to list collections from MongoDB database")?;
1069
1070    if collections.is_empty() {
1071        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
1072        tracing::info!("✅ Migration complete (no collections to migrate)");
1073        return Ok(());
1074    }
1075
1076    tracing::info!("Found {} collection(s) to migrate", collections.len());
1077
1078    // Step 4: Connect to PostgreSQL target
1079    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1080    let target_client = postgres::connect_with_retry(target_url).await?;
1081    tracing::info!("  ✓ Connected to PostgreSQL target");
1082
1083    // Step 5: Migrate each collection
1084    tracing::info!("Step 5/5: Migrating collections...");
1085    for (idx, collection_name) in collections.iter().enumerate() {
1086        tracing::info!(
1087            "Migrating collection {}/{}: '{}'",
1088            idx + 1,
1089            collections.len(),
1090            collection_name
1091        );
1092
1093        // Convert MongoDB collection to JSONB
1094        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1095            .await
1096            .with_context(|| {
1097                format!(
1098                    "Failed to convert collection '{}' to JSONB",
1099                    collection_name
1100                )
1101            })?;
1102
1103        tracing::info!(
1104            "  ✓ Converted {} documents from '{}'",
1105            rows.len(),
1106            collection_name
1107        );
1108
1109        // Create JSONB table in PostgreSQL
1110        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1111            .await
1112            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1113
1114        // Truncate existing data to make init idempotent (fixes #69)
1115        crate::jsonb::writer::truncate_jsonb_table(&target_client, collection_name)
1116            .await
1117            .with_context(|| format!("Failed to truncate JSONB table '{}'", collection_name))?;
1118
1119        tracing::info!(
1120            "  ✓ Created JSONB table '{}' in PostgreSQL",
1121            collection_name
1122        );
1123
1124        if !rows.is_empty() {
1125            // Batch insert all rows
1126            crate::jsonb::writer::insert_jsonb_batch(
1127                &target_client,
1128                collection_name,
1129                rows,
1130                "mongodb",
1131            )
1132            .await
1133            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1134
1135            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1136        } else {
1137            tracing::info!(
1138                "  ✓ Collection '{}' is empty (no documents to insert)",
1139                collection_name
1140            );
1141        }
1142    }
1143
1144    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1145    tracing::info!(
1146        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1147        collections.len(),
1148        db_name
1149    );
1150
1151    Ok(())
1152}
1153
1154/// Initial replication from MySQL to PostgreSQL
1155///
1156/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1157/// 1. Validates MySQL connection string
1158/// 2. Connects to MySQL and verifies connection
1159/// 3. Extracts database name from connection string
1160/// 4. Lists all tables from MySQL database
1161/// 5. For each table:
1162///    - Converts rows to JSONB format
1163///    - Creates JSONB table in PostgreSQL
1164///    - Batch inserts all data
1165///
1166/// All MySQL data is stored as JSONB with metadata:
1167/// - id: Primary key or auto-generated ID
1168/// - data: Complete row as JSON object
1169/// - _source_type: "mysql"
1170/// - _migrated_at: Timestamp of replication
1171///
1172/// # Arguments
1173///
1174/// * `mysql_url` - MySQL connection string (mysql://...)
1175/// * `target_url` - PostgreSQL connection string for target (Seren) database
1176///
1177/// # Returns
1178///
1179/// Returns `Ok(())` if replication completes successfully.
1180///
1181/// # Errors
1182///
1183/// This function will return an error if:
1184/// - MySQL connection string is invalid
1185/// - Cannot connect to MySQL database
1186/// - Database name is not specified in connection string
1187/// - Cannot connect to target PostgreSQL database
1188/// - Table conversion fails
1189/// - Database creation or insert operations fail
1190///
1191/// # Examples
1192///
1193/// ```no_run
1194/// # use anyhow::Result;
1195/// # use database_replicator::commands::init::init_mysql_to_postgres;
1196/// # async fn example() -> Result<()> {
1197/// init_mysql_to_postgres(
1198///     "mysql://user:pass@localhost:3306/mydb",
1199///     "postgresql://user:pass@seren.example.com/targetdb"
1200/// ).await?;
1201/// # Ok(())
1202/// # }
1203/// ```
1204pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1205    tracing::info!("Starting MySQL to PostgreSQL replication...");
1206
1207    // Step 1: Validate and connect to MySQL
1208    tracing::info!("Step 1/5: Validating MySQL connection...");
1209    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1210        .await
1211        .context("MySQL connection failed")?;
1212    tracing::info!("  ✓ MySQL connection validated");
1213
1214    // Step 2: Extract database name
1215    tracing::info!("Step 2/5: Extracting database name...");
1216    let db_name = crate::mysql::extract_database_name(mysql_url)
1217        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1218    tracing::info!("  ✓ Database name: '{}'", db_name);
1219
1220    // Step 3: List all tables
1221    tracing::info!("Step 3/5: Discovering tables...");
1222    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1223        .await
1224        .context("Failed to list tables from MySQL database")?;
1225
1226    if tables.is_empty() {
1227        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1228        tracing::info!("✅ Replication complete (no tables to replicate)");
1229        return Ok(());
1230    }
1231
1232    tracing::info!("Found {} table(s) to replicate", tables.len());
1233
1234    // Step 4: Connect to PostgreSQL target
1235    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1236    let target_client = postgres::connect_with_retry(target_url).await?;
1237    tracing::info!("  ✓ Connected to PostgreSQL target");
1238
1239    // Step 5: Replicate each table
1240    tracing::info!("Step 5/5: Replicating tables...");
1241    for (idx, table_name) in tables.iter().enumerate() {
1242        tracing::info!(
1243            "Replicating table {}/{}: '{}'",
1244            idx + 1,
1245            tables.len(),
1246            table_name
1247        );
1248
1249        // Convert MySQL table to JSONB
1250        let rows =
1251            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1252                .await
1253                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1254
1255        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1256
1257        // Create JSONB table in PostgreSQL
1258        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1259            .await
1260            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1261
1262        // Truncate existing data to make init idempotent (fixes #69)
1263        crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
1264            .await
1265            .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
1266
1267        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1268
1269        if !rows.is_empty() {
1270            // Batch insert all rows
1271            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1272                .await
1273                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1274
1275            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1276        } else {
1277            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1278        }
1279    }
1280
1281    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1282    tracing::info!(
1283        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1284        tables.len(),
1285        db_name
1286    );
1287
1288    Ok(())
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use super::*;
1294
1295    #[tokio::test]
1296    #[ignore]
1297    async fn test_init_replicates_database() {
1298        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1299        let target = std::env::var("TEST_TARGET_URL").unwrap();
1300
1301        // Skip confirmation for automated tests, disable sync to keep test simple
1302        let filter = crate::filters::ReplicationFilter::empty();
1303        let result = init(&source, &target, true, filter, false, false, true, false).await;
1304        assert!(result.is_ok());
1305    }
1306
1307    #[test]
1308    fn test_replace_database_in_url() {
1309        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1310        let result = replace_database_in_url(url, "newdb").unwrap();
1311        assert_eq!(
1312            result,
1313            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1314        );
1315
1316        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1317        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1318        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1319    }
1320
1321    #[tokio::test]
1322    #[ignore]
1323    async fn test_database_is_empty() {
1324        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1325
1326        // Connect to the database first
1327        let client = crate::postgres::connect_with_retry(&url)
1328            .await
1329            .expect("Failed to connect");
1330
1331        // postgres database might be empty of user tables
1332        // This test just verifies the function doesn't crash
1333        let result = database_is_empty(&client).await;
1334        assert!(result.is_ok());
1335    }
1336}