database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::info!(
157                    "--drop-existing: existing JSONB tables on the target will be dropped"
158                );
159            }
160            if !enable_sync {
161                tracing::warn!(
162                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
163                );
164            }
165
166            return init_sqlite_to_postgres(source_url, target_url, drop_existing).await;
167        }
168        crate::SourceType::MongoDB => {
169            // MongoDB to PostgreSQL migration (simpler path)
170            tracing::info!("Source type: MongoDB");
171
172            // MongoDB migrations don't support PostgreSQL-specific features
173            if !filter.is_empty() {
174                tracing::warn!(
175                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
176                );
177            }
178            if drop_existing {
179                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
180            }
181            if !enable_sync {
182                tracing::warn!(
183                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
184                );
185            }
186
187            return init_mongodb_to_postgres(source_url, target_url).await;
188        }
189        crate::SourceType::MySQL => {
190            // MySQL to PostgreSQL replication (simpler path)
191            tracing::info!("Source type: MySQL");
192
193            // MySQL replications don't support PostgreSQL-specific features
194            if !filter.is_empty() {
195                tracing::warn!(
196                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
197                );
198            }
199            if drop_existing {
200                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
201            }
202            if !enable_sync {
203                tracing::warn!(
204                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
205                );
206            }
207
208            return init_mysql_to_postgres(source_url, target_url).await;
209        }
210    }
211
212    // CRITICAL: Ensure source and target are different to prevent data loss
213    crate::utils::validate_source_target_different(source_url, target_url)
214        .context("Source and target validation failed")?;
215    tracing::info!("✓ Verified source and target are different databases");
216
217    // Create managed temporary directory for dump files
218    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
219    let temp_path =
220        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
221    tracing::debug!("Using temp directory: {}", temp_path.display());
222
223    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
224        .context("Failed to determine checkpoint location")?;
225
226    // Step 1: Dump global objects
227    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
228    let globals_file = temp_path.join("globals.sql");
229    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
230    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
231        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
232    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
233        .context("Failed to remove SUPERUSER from globals dump")?;
234    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted parameter settings from globals dump")?;
236    remove_restricted_role_grants(globals_file.to_str().unwrap())
237        .context("Failed to remove restricted role grants from globals dump")?;
238    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
239        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
240
241    // Step 2: Restore global objects
242    tracing::info!("Step 2/4: Restoring global objects to target...");
243    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
244
245    // Step 3: Discover and filter databases
246    tracing::info!("Step 3/4: Discovering databases...");
247    let all_databases = {
248        // Scope the connection so it's dropped before subprocess operations
249        let source_client = postgres::connect_with_retry(source_url).await?;
250        migration::list_databases(&source_client).await?
251    }; // Connection dropped here
252
253    // Apply filtering rules
254    let databases: Vec<_> = all_databases
255        .into_iter()
256        .filter(|db| filter.should_replicate_database(&db.name))
257        .collect();
258
259    if databases.is_empty() {
260        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
261        if filter.is_empty() {
262            tracing::warn!("⚠ No user databases found on source");
263            tracing::warn!("  This is unusual - the source database appears empty");
264            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
265        } else {
266            tracing::warn!("⚠ No databases matched the filter criteria");
267            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
268        }
269        tracing::info!("✅ Initial replication complete (no databases to replicate)");
270        return Ok(());
271    }
272
273    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
274    let filter_hash = filter.fingerprint();
275    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
276        source_url,
277        target_url,
278        filter_hash,
279        drop_existing,
280        enable_sync,
281    );
282
283    let mut checkpoint_state = if allow_resume {
284        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
285            Some(existing) => {
286                // Try to validate the checkpoint
287                match existing.validate(&checkpoint_metadata, &database_names) {
288                    Ok(()) => {
289                        // Validation succeeded - resume from checkpoint
290                        if existing.completed_count() > 0 {
291                            tracing::info!(
292                                "Resume checkpoint found: {}/{} databases already replicated",
293                                existing.completed_count(),
294                                existing.total_databases()
295                            );
296                        } else {
297                            tracing::info!(
298                                "Resume checkpoint found but no databases marked complete yet"
299                            );
300                        }
301                        existing
302                    }
303                    Err(e) => {
304                        // Validation failed - log warning and start fresh
305                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
306                        tracing::warn!(
307                            "  Previous run configuration differs from current configuration"
308                        );
309                        tracing::warn!("  - Schema-only tables may have changed");
310                        tracing::warn!("  - Time filters may have changed");
311                        tracing::warn!("  - Table selection may have changed");
312                        tracing::warn!("  Error: {}", e);
313                        tracing::info!("");
314                        tracing::info!(
315                            "✓ Automatically discarding old checkpoint and starting fresh"
316                        );
317                        checkpoint::remove_checkpoint(&checkpoint_path)?;
318                        checkpoint::InitCheckpoint::new(
319                            checkpoint_metadata.clone(),
320                            &database_names,
321                        )
322                    }
323                }
324            }
325            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
326        }
327    } else {
328        if checkpoint_path.exists() {
329            tracing::info!(
330                "--no-resume supplied: discarding previous checkpoint at {}",
331                checkpoint_path.display()
332            );
333        }
334        checkpoint::remove_checkpoint(&checkpoint_path)?;
335        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
336    };
337
338    // Persist baseline state so crashes before first database can resume cleanly
339    checkpoint_state
340        .save(&checkpoint_path)
341        .context("Failed to persist checkpoint state")?;
342
343    tracing::info!("Found {} database(s) to replicate", databases.len());
344
345    // Check if user specified a target database name that doesn't match source databases
346    // This is important because init preserves source database names during replication
347    if let Ok(target_parts) = crate::utils::parse_postgres_url(target_url) {
348        let target_db = &target_parts.database;
349        let source_db_names: Vec<&str> = databases.iter().map(|db| db.name.as_str()).collect();
350
351        if !source_db_names.contains(&target_db.as_str())
352            && target_db != "postgres"
353            && !target_db.is_empty()
354        {
355            println!();
356            println!("========================================");
357            println!("⚠️  IMPORTANT: Target database name ignored");
358            println!("========================================");
359            println!();
360            println!(
361                "You specified target database '{}', but replication preserves",
362                target_db
363            );
364            println!("source database names. Data will be replicated to:");
365            println!();
366            for db_name in &source_db_names {
367                println!("  → {}", db_name);
368            }
369            println!();
370            println!(
371                "The '{}' database in your target URL is used only for",
372                target_db
373            );
374            println!("the initial connection. Source databases will be created");
375            println!("on the target server with their original names.");
376            println!();
377            println!("When running 'sync' later, use the correct database name:");
378            if let Some(first_db) = source_db_names.first() {
379                println!("  --target \"postgresql://.../{}\"\n", first_db);
380            }
381            println!("========================================");
382            println!();
383        }
384    }
385
386    // Estimate database sizes and get confirmation
387    if !skip_confirmation {
388        tracing::info!("Analyzing database sizes...");
389        let size_estimates = {
390            // Scope the connection so it's dropped after size estimation
391            let source_client = postgres::connect_with_retry(source_url).await?;
392            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
393                .await?
394        }; // Connection dropped here
395
396        if !confirm_replication(&size_estimates)? {
397            bail!("Replication cancelled by user");
398        }
399    }
400
401    // Step 4: Replicate each database
402    tracing::info!("Step 4/4: Replicating databases...");
403    for (idx, db_info) in databases.iter().enumerate() {
404        let filtered_tables = filter.predicate_tables(&db_info.name);
405        if checkpoint_state.is_completed(&db_info.name) {
406            tracing::info!(
407                "Skipping database '{}' (already completed per checkpoint)",
408                db_info.name
409            );
410            continue;
411        }
412        tracing::info!(
413            "Replicating database {}/{}: '{}'",
414            idx + 1,
415            databases.len(),
416            db_info.name
417        );
418
419        // Build connection URLs for this specific database
420        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
421        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
422
423        // Handle database creation atomically to avoid TOCTOU race condition
424        // Scope the connection so it's dropped before dump/restore subprocess operations
425        {
426            let target_client = postgres::connect_with_retry(target_url).await?;
427
428            // Validate database name to prevent SQL injection
429            crate::utils::validate_postgres_identifier(&db_info.name)
430                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
431
432            // Try to create database atomically (avoids TOCTOU vulnerability)
433            let create_query = format!(
434                "CREATE DATABASE {}",
435                crate::utils::quote_ident(&db_info.name)
436            );
437            match target_client.execute(&create_query, &[]).await {
438                Ok(_) => {
439                    tracing::info!("  Created database '{}'", db_info.name);
440                }
441                Err(err) => {
442                    // Check if error is "database already exists" (error code 42P04)
443                    if let Some(db_error) = err.as_db_error() {
444                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
445                            // Database already exists - handle based on user preferences
446                            tracing::info!(
447                                "  Database '{}' already exists on target",
448                                db_info.name
449                            );
450
451                            // Check if empty by connecting to the SPECIFIC database
452                            // (target_client is connected to default db, not the target db)
453                            let is_empty = {
454                                let db_client =
455                                    postgres::connect_with_retry(&target_db_url).await?;
456                                database_is_empty(&db_client).await?
457                            }; // db_client dropped here
458
459                            if is_empty {
460                                tracing::info!(
461                                    "  Database '{}' is empty, proceeding with restore",
462                                    db_info.name
463                                );
464                            } else {
465                                // Database exists and has data
466                                let should_drop = if drop_existing {
467                                    // Force drop with --drop-existing flag
468                                    true
469                                } else if skip_confirmation {
470                                    // Auto-confirm drop with --yes flag (non-interactive)
471                                    tracing::info!(
472                                        "  Auto-confirming drop for database '{}' (--yes flag)",
473                                        db_info.name
474                                    );
475                                    true
476                                } else {
477                                    // Interactive mode: prompt user
478                                    prompt_drop_database(&db_info.name)?
479                                };
480
481                                if should_drop {
482                                    drop_database_if_exists(&target_client, &db_info.name).await?;
483
484                                    // Recreate the database
485                                    let create_query = format!(
486                                        "CREATE DATABASE {}",
487                                        crate::utils::quote_ident(&db_info.name)
488                                    );
489                                    target_client
490                                        .execute(&create_query, &[])
491                                        .await
492                                        .with_context(|| {
493                                            format!(
494                                                "Failed to create database '{}' after drop",
495                                                db_info.name
496                                            )
497                                        })?;
498                                    tracing::info!("  Created database '{}'", db_info.name);
499                                } else {
500                                    bail!("Aborted: Database '{}' already exists", db_info.name);
501                                }
502                            }
503                        } else {
504                            // Some other database error - propagate it
505                            return Err(err).with_context(|| {
506                                format!("Failed to create database '{}'", db_info.name)
507                            });
508                        }
509                    } else {
510                        // Not a database error - propagate it
511                        return Err(err).with_context(|| {
512                            format!("Failed to create database '{}'", db_info.name)
513                        });
514                    }
515                }
516            }
517        } // Connection dropped here before dump/restore operations
518
519        // Dump and restore schema
520        tracing::info!("  Dumping schema for '{}'...", db_info.name);
521        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
522        migration::dump_schema(
523            &source_db_url,
524            &db_info.name,
525            schema_file.to_str().unwrap(),
526            &filter,
527        )
528        .await?;
529
530        tracing::info!("  Restoring schema for '{}'...", db_info.name);
531        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
532
533        // Dump and restore data (using directory format for parallel operations)
534        tracing::info!("  Dumping data for '{}'...", db_info.name);
535        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
536        migration::dump_data(
537            &source_db_url,
538            &db_info.name,
539            data_dir.to_str().unwrap(),
540            &filter,
541        )
542        .await?;
543
544        tracing::info!("  Restoring data for '{}'...", db_info.name);
545        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
546
547        if !filtered_tables.is_empty() {
548            tracing::info!(
549                "  Applying filtered replication for {} table(s)...",
550                filtered_tables.len()
551            );
552            migration::filtered::copy_filtered_tables(
553                &source_db_url,
554                &target_db_url,
555                &filtered_tables,
556            )
557            .await?;
558        }
559
560        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
561
562        checkpoint_state.mark_completed(&db_info.name);
563        checkpoint_state
564            .save(&checkpoint_path)
565            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
566    }
567
568    // Explicitly clean up temp directory
569    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
570    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
571        tracing::warn!("Failed to clean up temp directory: {}", e);
572        // Don't fail the entire operation if cleanup fails
573    }
574
575    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
576        tracing::warn!("Failed to remove checkpoint state: {}", err);
577    }
578
579    tracing::info!("✅ Initial replication complete");
580
581    // Check wal_level before attempting to set up sync
582    let mut should_enable_sync = enable_sync;
583    if enable_sync {
584        tracing::info!("Checking target wal_level for logical replication...");
585        let target_wal_level = {
586            // Scope the connection for quick wal_level check
587            let target_client = postgres::connect_with_retry(target_url).await?;
588            postgres::check_wal_level(&target_client).await?
589        }; // Connection dropped here
590
591        if target_wal_level != "logical" {
592            tracing::warn!("");
593            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
594            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
595            tracing::warn!("");
596            tracing::warn!("  To fix this:");
597            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
598            tracing::warn!("    2. Restart PostgreSQL server");
599            tracing::warn!(
600                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
601            );
602            tracing::warn!("");
603            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
604            should_enable_sync = false;
605        }
606    }
607
608    // Set up continuous logical replication if enabled
609    if should_enable_sync {
610        tracing::info!("");
611        tracing::info!("========================================");
612        tracing::info!("Step 5/5: Setting up continuous replication...");
613        tracing::info!("========================================");
614        tracing::info!("");
615
616        // Call sync command with the same filter
617        crate::commands::sync(
618            source_url,
619            target_url,
620            Some(filter),
621            None,
622            None,
623            None,
624            false,
625        )
626        .await
627        .context("Failed to set up continuous replication")?;
628
629        tracing::info!("");
630        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
631    } else {
632        tracing::info!("");
633        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
634        tracing::info!("  To enable it later, run:");
635        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
636    }
637
638    Ok(())
639}
640
641/// Replace the database name in a connection URL
642fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
643    // Parse URL to find database name
644    // Format: postgresql://user:pass@host:port/database?params
645
646    // Split by '?' to separate params
647    let parts: Vec<&str> = url.split('?').collect();
648    let base_url = parts[0];
649    let params = if parts.len() > 1 {
650        Some(parts[1])
651    } else {
652        None
653    };
654
655    // Split base by '/' to get everything before database name
656    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
657    if url_parts.len() != 2 {
658        anyhow::bail!("Invalid connection URL format");
659    }
660
661    // Reconstruct URL with new database name
662    let mut new_url = format!("{}/{}", url_parts[1], new_database);
663    if let Some(p) = params {
664        new_url = format!("{}?{}", new_url, p);
665    }
666
667    Ok(new_url)
668}
669
670/// Display database size estimates and prompt for confirmation
671///
672/// Shows a table with database names, sizes, and estimated replication times.
673/// Prompts the user to proceed with the replication.
674///
675/// # Arguments
676///
677/// * `sizes` - Vector of database size estimates
678///
679/// # Returns
680///
681/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
682///
683/// # Errors
684///
685/// Returns an error if stdin/stdout operations fail.
686fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
687    use std::time::Duration;
688
689    // Calculate totals
690    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
691    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
692
693    // Print table header
694    println!();
695    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
696    println!("{}", "─".repeat(50));
697
698    // Print each database
699    for size in sizes {
700        println!(
701            "{:<20} {:<12} {:<15}",
702            size.name,
703            size.size_human,
704            migration::format_duration(size.estimated_duration)
705        );
706    }
707
708    // Print totals
709    println!("{}", "─".repeat(50));
710    println!(
711        "Total: {} (estimated {})",
712        migration::format_bytes(total_bytes),
713        migration::format_duration(total_duration)
714    );
715    println!();
716
717    // Prompt for confirmation
718    print!("Proceed with replication? [y/N]: ");
719    io::stdout().flush()?;
720
721    let mut input = String::new();
722    io::stdin()
723        .read_line(&mut input)
724        .context("Failed to read user input")?;
725
726    Ok(input.trim().to_lowercase() == "y")
727}
728
729/// Checks if the currently connected database is empty (has no user tables).
730///
731/// Includes a 30-second timeout to prevent hanging on stale serverless connections.
732async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
733    let query = "
734        SELECT COUNT(*)
735        FROM information_schema.tables
736        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
737    ";
738
739    // Add timeout to prevent indefinite hang on stale serverless connections
740    let row = tokio::time::timeout(
741        std::time::Duration::from_secs(30),
742        client.query_one(query, &[]),
743    )
744    .await
745    .context("database_is_empty query timed out after 30 seconds")?
746    .context("Failed to query information_schema.tables")?;
747
748    let count: i64 = row.get(0);
749
750    Ok(count == 0)
751}
752
753/// Prompts user to drop existing database
754fn prompt_drop_database(db_name: &str) -> Result<bool> {
755    use std::io::{self, Write};
756
757    print!(
758        "\nWarning: Database '{}' already exists on target and contains data.\n\
759         Drop and recreate database? This will delete all existing data. [y/N]: ",
760        db_name
761    );
762    io::stdout().flush()?;
763
764    let mut input = String::new();
765    io::stdin().read_line(&mut input)?;
766
767    Ok(input.trim().eq_ignore_ascii_case("y"))
768}
769
770/// Drops a database if it exists
771async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
772    // Validate database name to prevent SQL injection
773    crate::utils::validate_postgres_identifier(db_name)
774        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
775
776    tracing::info!("  Dropping existing database '{}'...", db_name);
777
778    // Terminate existing connections to the database
779    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
780    let terminate_query = "
781        SELECT pg_terminate_backend(sa.pid)
782        FROM pg_stat_activity sa
783        JOIN pg_roles r ON sa.usename = r.rolname
784        WHERE sa.datname = $1
785          AND sa.pid <> pg_backend_pid()
786          AND NOT r.rolsuper
787    ";
788    target_conn.execute(terminate_query, &[&db_name]).await?;
789
790    // Check if any connections remain (including SUPERUSER connections we couldn't terminate)
791    let remaining_query = "
792        SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
793        FROM pg_stat_activity sa
794        WHERE sa.datname = $1
795          AND sa.pid <> pg_backend_pid()
796    ";
797    let row = target_conn
798        .query_one(remaining_query, &[&db_name])
799        .await
800        .context("Failed to check remaining connections")?;
801    let remaining_count: i64 = row.get(0);
802    let remaining_users: Option<String> = row.get(1);
803
804    if remaining_count > 0 {
805        let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
806        bail!(
807            "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
808             These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
809             This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
810             processes maintain superuser connections.\n\n\
811             To resolve this:\n\
812             1. Wait a few minutes and retry (system connections may be temporary)\n\
813             2. Ask your database administrator to terminate the blocking sessions:\n\
814                SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
815             3. If using AWS RDS, check for RDS-managed connections in the RDS console",
816            db_name,
817            remaining_count,
818            users,
819            db_name
820        );
821    }
822
823    // Drop the database
824    let drop_query = format!(
825        "DROP DATABASE IF EXISTS {}",
826        crate::utils::quote_ident(db_name)
827    );
828    target_conn
829        .execute(&drop_query, &[])
830        .await
831        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
832
833    tracing::info!("  ✓ Database '{}' dropped", db_name);
834    Ok(())
835}
836
837/// Initial replication from SQLite to PostgreSQL
838///
839/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
840/// 1. Validates SQLite file exists and is readable
841/// 2. Validates PostgreSQL target connection
842/// 3. Lists all tables from SQLite database
843/// 4. For each table:
844///    - Converts rows to JSONB format
845///    - Creates JSONB table in PostgreSQL
846///    - Batch inserts all data
847///
848/// All SQLite data is stored as JSONB with metadata:
849/// - id: Original row ID (from ID column or row number)
850/// - data: Complete row as JSON object
851/// - _source_type: "sqlite"
852/// - _migrated_at: Timestamp of migration
853///
854/// # Arguments
855///
856/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
857/// * `target_url` - PostgreSQL connection string for target (Seren) database
858/// * `drop_existing` - Drop any existing JSONB tables on the target before migrating
859///
860/// # Returns
861///
862/// Returns `Ok(())` if migration completes successfully.
863///
864/// # Errors
865///
866/// This function will return an error if:
867/// - SQLite file doesn't exist or isn't readable
868/// - Cannot connect to target PostgreSQL database
869/// - Table conversion fails
870/// - Database creation or insert operations fail
871///
872/// # Examples
873///
874/// ```no_run
875/// # use anyhow::Result;
876/// # use database_replicator::commands::init::init_sqlite_to_postgres;
877/// # async fn example() -> Result<()> {
878/// init_sqlite_to_postgres(
879///     "database.db",
880///     "postgresql://user:pass@seren.example.com/targetdb",
881///     false,
882/// ).await?;
883/// # Ok(())
884/// # }
885/// ```
886pub async fn init_sqlite_to_postgres(
887    sqlite_path: &str,
888    target_url: &str,
889    drop_existing: bool,
890) -> Result<()> {
891    tracing::info!("Starting SQLite to PostgreSQL migration...");
892
893    // Step 1: Validate SQLite file
894    tracing::info!("Step 1/4: Validating SQLite database...");
895    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
896        .context("SQLite file validation failed")?;
897    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
898
899    // Step 2: Open SQLite connection (read-only)
900    tracing::info!("Step 2/4: Opening SQLite database...");
901    let sqlite_conn =
902        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
903    tracing::info!("  ✓ SQLite database opened (read-only mode)");
904
905    // Step 3: List all tables
906    tracing::info!("Step 3/4: Discovering tables...");
907    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
908        .context("Failed to list tables from SQLite database")?;
909
910    if tables.is_empty() {
911        tracing::warn!("⚠ No tables found in SQLite database");
912        tracing::info!("✅ Migration complete (no tables to migrate)");
913        return Ok(());
914    }
915
916    tracing::info!("Found {} table(s) to migrate", tables.len());
917
918    // Connect to PostgreSQL target
919    let target_client = postgres::connect_with_retry(target_url).await?;
920    tracing::info!("  ✓ Connected to PostgreSQL target");
921
922    // Get row counts for progress display
923    let mut table_row_counts: Vec<(&str, usize)> = Vec::new();
924    let mut total_rows = 0usize;
925    for table_name in &tables {
926        let count =
927            crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0);
928        table_row_counts.push((table_name, count));
929        total_rows += count;
930    }
931
932    tracing::info!(
933        "Total rows to migrate: {} across {} table(s)",
934        total_rows,
935        tables.len()
936    );
937
938    // Step 4: Migrate each table using batched processing
939    tracing::info!("Step 4/4: Migrating tables (batched processing)...");
940    let mut migrated_rows = 0usize;
941
942    for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() {
943        tracing::info!(
944            "Migrating table {}/{}: '{}' ({} rows)",
945            idx + 1,
946            tables.len(),
947            table_name,
948            row_count
949        );
950
951        if drop_existing {
952            crate::jsonb::writer::drop_jsonb_table(&target_client, table_name)
953                .await
954                .with_context(|| format!("Failed to drop existing JSONB table '{}'", table_name))?;
955        }
956
957        // Create JSONB table in PostgreSQL
958        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
959            .await
960            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
961
962        // Truncate existing data to make init idempotent (fixes #69)
963        crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
964            .await
965            .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
966
967        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
968
969        // Use batched conversion for memory efficiency
970        let rows_processed = crate::sqlite::converter::convert_table_batched(
971            &sqlite_conn,
972            &target_client,
973            table_name,
974            "sqlite",
975            None, // Use default batch size
976        )
977        .await
978        .with_context(|| format!("Failed to migrate table '{}'", table_name))?;
979
980        migrated_rows += rows_processed;
981
982        if rows_processed > 0 {
983            tracing::info!(
984                "  ✓ Migrated {} rows from '{}' ({:.1}% of total)",
985                rows_processed,
986                table_name,
987                if total_rows > 0 {
988                    migrated_rows as f64 / total_rows as f64 * 100.0
989                } else {
990                    100.0
991                }
992            );
993        } else {
994            tracing::info!("  ✓ Table '{}' is empty (no rows to migrate)", table_name);
995        }
996    }
997
998    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
999    tracing::info!(
1000        "   Migrated {} row(s) from {} table(s) in '{}'",
1001        migrated_rows,
1002        tables.len(),
1003        sqlite_path
1004    );
1005
1006    Ok(())
1007}
1008
1009/// Initial replication from MongoDB to PostgreSQL
1010///
1011/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
1012/// 1. Validates MongoDB connection string
1013/// 2. Connects to MongoDB and verifies connection
1014/// 3. Extracts database name from connection string
1015/// 4. Lists all collections from MongoDB database
1016/// 5. For each collection:
1017///    - Converts documents to JSONB format
1018///    - Creates JSONB table in PostgreSQL
1019///    - Batch inserts all data
1020///
1021/// All MongoDB data is stored as JSONB with metadata:
1022/// - id: Original _id field (ObjectId → hex, String/Int → string)
1023/// - data: Complete document as JSON object
1024/// - _source_type: "mongodb"
1025/// - _migrated_at: Timestamp of migration
1026///
1027/// # Arguments
1028///
1029/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
1030/// * `target_url` - PostgreSQL connection string for target (Seren) database
1031///
1032/// # Returns
1033///
1034/// Returns `Ok(())` if migration completes successfully.
1035///
1036/// # Errors
1037///
1038/// This function will return an error if:
1039/// - MongoDB connection string is invalid
1040/// - Cannot connect to MongoDB database
1041/// - Database name is not specified in connection string
1042/// - Cannot connect to target PostgreSQL database
1043/// - Collection conversion fails
1044/// - Database creation or insert operations fail
1045///
1046/// # Examples
1047///
1048/// ```no_run
1049/// # use anyhow::Result;
1050/// # use database_replicator::commands::init::init_mongodb_to_postgres;
1051/// # async fn example() -> Result<()> {
1052/// init_mongodb_to_postgres(
1053///     "mongodb://localhost:27017/mydb",
1054///     "postgresql://user:pass@seren.example.com/targetdb"
1055/// ).await?;
1056/// # Ok(())
1057/// # }
1058/// ```
1059pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
1060    tracing::info!("Starting MongoDB to PostgreSQL migration...");
1061
1062    // Step 1: Validate and connect to MongoDB
1063    tracing::info!("Step 1/5: Validating MongoDB connection...");
1064    let client = crate::mongodb::connect_mongodb(mongo_url)
1065        .await
1066        .context("MongoDB connection failed")?;
1067    tracing::info!("  ✓ MongoDB connection validated");
1068
1069    // Step 2: Extract database name
1070    tracing::info!("Step 2/5: Extracting database name...");
1071    let db_name = crate::mongodb::extract_database_name(mongo_url)
1072        .await
1073        .context("Failed to parse MongoDB connection string")?
1074        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
1075    tracing::info!("  ✓ Database name: '{}'", db_name);
1076
1077    // Step 3: List all collections
1078    tracing::info!("Step 3/5: Discovering collections...");
1079    let db = client.database(&db_name);
1080    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
1081        .await
1082        .context("Failed to list collections from MongoDB database")?;
1083
1084    if collections.is_empty() {
1085        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
1086        tracing::info!("✅ Migration complete (no collections to migrate)");
1087        return Ok(());
1088    }
1089
1090    tracing::info!("Found {} collection(s) to migrate", collections.len());
1091
1092    // Step 4: Connect to PostgreSQL target
1093    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1094    let target_client = postgres::connect_with_retry(target_url).await?;
1095    tracing::info!("  ✓ Connected to PostgreSQL target");
1096
1097    // Step 5: Migrate each collection
1098    tracing::info!("Step 5/5: Migrating collections...");
1099    for (idx, collection_name) in collections.iter().enumerate() {
1100        tracing::info!(
1101            "Migrating collection {}/{}: '{}'",
1102            idx + 1,
1103            collections.len(),
1104            collection_name
1105        );
1106
1107        // Convert MongoDB collection to JSONB
1108        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1109            .await
1110            .with_context(|| {
1111                format!(
1112                    "Failed to convert collection '{}' to JSONB",
1113                    collection_name
1114                )
1115            })?;
1116
1117        tracing::info!(
1118            "  ✓ Converted {} documents from '{}'",
1119            rows.len(),
1120            collection_name
1121        );
1122
1123        // Create JSONB table in PostgreSQL
1124        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1125            .await
1126            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1127
1128        // Truncate existing data to make init idempotent (fixes #69)
1129        crate::jsonb::writer::truncate_jsonb_table(&target_client, collection_name)
1130            .await
1131            .with_context(|| format!("Failed to truncate JSONB table '{}'", collection_name))?;
1132
1133        tracing::info!(
1134            "  ✓ Created JSONB table '{}' in PostgreSQL",
1135            collection_name
1136        );
1137
1138        if !rows.is_empty() {
1139            // Batch insert all rows
1140            crate::jsonb::writer::insert_jsonb_batch(
1141                &target_client,
1142                collection_name,
1143                rows,
1144                "mongodb",
1145            )
1146            .await
1147            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1148
1149            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1150        } else {
1151            tracing::info!(
1152                "  ✓ Collection '{}' is empty (no documents to insert)",
1153                collection_name
1154            );
1155        }
1156    }
1157
1158    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1159    tracing::info!(
1160        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1161        collections.len(),
1162        db_name
1163    );
1164
1165    Ok(())
1166}
1167
1168/// Initial replication from MySQL to PostgreSQL
1169///
1170/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1171/// 1. Validates MySQL connection string
1172/// 2. Connects to MySQL and verifies connection
1173/// 3. Extracts database name from connection string
1174/// 4. Lists all tables from MySQL database
1175/// 5. For each table:
1176///    - Converts rows to JSONB format
1177///    - Creates JSONB table in PostgreSQL
1178///    - Batch inserts all data
1179///
1180/// All MySQL data is stored as JSONB with metadata:
1181/// - id: Primary key or auto-generated ID
1182/// - data: Complete row as JSON object
1183/// - _source_type: "mysql"
1184/// - _migrated_at: Timestamp of replication
1185///
1186/// # Arguments
1187///
1188/// * `mysql_url` - MySQL connection string (mysql://...)
1189/// * `target_url` - PostgreSQL connection string for target (Seren) database
1190///
1191/// # Returns
1192///
1193/// Returns `Ok(())` if replication completes successfully.
1194///
1195/// # Errors
1196///
1197/// This function will return an error if:
1198/// - MySQL connection string is invalid
1199/// - Cannot connect to MySQL database
1200/// - Database name is not specified in connection string
1201/// - Cannot connect to target PostgreSQL database
1202/// - Table conversion fails
1203/// - Database creation or insert operations fail
1204///
1205/// # Examples
1206///
1207/// ```no_run
1208/// # use anyhow::Result;
1209/// # use database_replicator::commands::init::init_mysql_to_postgres;
1210/// # async fn example() -> Result<()> {
1211/// init_mysql_to_postgres(
1212///     "mysql://user:pass@localhost:3306/mydb",
1213///     "postgresql://user:pass@seren.example.com/targetdb"
1214/// ).await?;
1215/// # Ok(())
1216/// # }
1217/// ```
1218pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1219    tracing::info!("Starting MySQL to PostgreSQL replication...");
1220
1221    // Step 1: Validate and connect to MySQL
1222    tracing::info!("Step 1/5: Validating MySQL connection...");
1223    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1224        .await
1225        .context("MySQL connection failed")?;
1226    tracing::info!("  ✓ MySQL connection validated");
1227
1228    // Step 2: Extract database name
1229    tracing::info!("Step 2/5: Extracting database name...");
1230    let db_name = crate::mysql::extract_database_name(mysql_url)
1231        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1232    tracing::info!("  ✓ Database name: '{}'", db_name);
1233
1234    // Step 3: List all tables
1235    tracing::info!("Step 3/5: Discovering tables...");
1236    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1237        .await
1238        .context("Failed to list tables from MySQL database")?;
1239
1240    if tables.is_empty() {
1241        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1242        tracing::info!("✅ Replication complete (no tables to replicate)");
1243        return Ok(());
1244    }
1245
1246    tracing::info!("Found {} table(s) to replicate", tables.len());
1247
1248    // Step 4: Connect to PostgreSQL target
1249    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1250    let target_client = postgres::connect_with_retry(target_url).await?;
1251    tracing::info!("  ✓ Connected to PostgreSQL target");
1252
1253    // Step 5: Replicate each table
1254    tracing::info!("Step 5/5: Replicating tables...");
1255    for (idx, table_name) in tables.iter().enumerate() {
1256        tracing::info!(
1257            "Replicating table {}/{}: '{}'",
1258            idx + 1,
1259            tables.len(),
1260            table_name
1261        );
1262
1263        // Convert MySQL table to JSONB
1264        let rows =
1265            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1266                .await
1267                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1268
1269        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1270
1271        // Create JSONB table in PostgreSQL
1272        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1273            .await
1274            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1275
1276        // Truncate existing data to make init idempotent (fixes #69)
1277        crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
1278            .await
1279            .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
1280
1281        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1282
1283        if !rows.is_empty() {
1284            // Batch insert all rows
1285            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1286                .await
1287                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1288
1289            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1290        } else {
1291            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1292        }
1293    }
1294
1295    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1296    tracing::info!(
1297        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1298        tables.len(),
1299        db_name
1300    );
1301
1302    Ok(())
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307    use super::*;
1308
1309    #[tokio::test]
1310    #[ignore]
1311    async fn test_init_replicates_database() {
1312        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1313        let target = std::env::var("TEST_TARGET_URL").unwrap();
1314
1315        // Skip confirmation for automated tests, disable sync to keep test simple
1316        let filter = crate::filters::ReplicationFilter::empty();
1317        let result = init(&source, &target, true, filter, false, false, true, false).await;
1318        assert!(result.is_ok());
1319    }
1320
1321    #[test]
1322    fn test_replace_database_in_url() {
1323        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1324        let result = replace_database_in_url(url, "newdb").unwrap();
1325        assert_eq!(
1326            result,
1327            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1328        );
1329
1330        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1331        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1332        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1333    }
1334
1335    #[tokio::test]
1336    #[ignore]
1337    async fn test_database_is_empty() {
1338        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1339
1340        // Connect to the database first
1341        let client = crate::postgres::connect_with_retry(&url)
1342            .await
1343            .expect("Failed to connect");
1344
1345        // postgres database might be empty of user tables
1346        // This test just verifies the function doesn't crash
1347        let result = database_is_empty(&client).await;
1348        assert!(result.is_ok());
1349    }
1350}