database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            // Get explicit database list from filters (include_databases or extracted from include_tables)
108            let databases = filter.databases_to_check();
109            let preflight_result = crate::preflight::run_preflight_checks(
110                source_url,
111                target_url,
112                databases.as_deref(),
113            )
114            .await?;
115
116            preflight_result.print();
117
118            if !preflight_result.all_passed() {
119                // Check if we can auto-fallback to remote
120                if preflight_result.tool_version_incompatible
121                    && crate::utils::is_serendb_target(target_url)
122                    && !force_local
123                {
124                    println!();
125                    tracing::info!(
126                        "Tool version incompatible. Switching to SerenAI cloud execution..."
127                    );
128                    // Return special error that main.rs catches to trigger remote
129                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130                }
131
132                // Cannot auto-fallback
133                if force_local {
134                    bail!(
135                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
136                         Fix the issues above or remove --local to allow remote execution."
137                    );
138                }
139
140                bail!("Pre-flight checks failed. Fix the issues above and retry.");
141            }
142
143            println!();
144        }
145        crate::SourceType::SQLite => {
146            // SQLite to PostgreSQL migration (simpler path)
147            tracing::info!("Source type: SQLite");
148
149            // SQLite migrations don't support PostgreSQL-specific features
150            if !filter.is_empty() {
151                tracing::warn!(
152                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153                );
154            }
155            if drop_existing {
156                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157            }
158            if !enable_sync {
159                tracing::warn!(
160                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161                );
162            }
163
164            return init_sqlite_to_postgres(source_url, target_url).await;
165        }
166        crate::SourceType::MongoDB => {
167            // MongoDB to PostgreSQL migration (simpler path)
168            tracing::info!("Source type: MongoDB");
169
170            // MongoDB migrations don't support PostgreSQL-specific features
171            if !filter.is_empty() {
172                tracing::warn!(
173                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174                );
175            }
176            if drop_existing {
177                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178            }
179            if !enable_sync {
180                tracing::warn!(
181                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182                );
183            }
184
185            return init_mongodb_to_postgres(source_url, target_url).await;
186        }
187        crate::SourceType::MySQL => {
188            // MySQL to PostgreSQL replication (simpler path)
189            tracing::info!("Source type: MySQL");
190
191            // MySQL replications don't support PostgreSQL-specific features
192            if !filter.is_empty() {
193                tracing::warn!(
194                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195                );
196            }
197            if drop_existing {
198                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199            }
200            if !enable_sync {
201                tracing::warn!(
202                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203                );
204            }
205
206            return init_mysql_to_postgres(source_url, target_url).await;
207        }
208    }
209
210    // CRITICAL: Ensure source and target are different to prevent data loss
211    crate::utils::validate_source_target_different(source_url, target_url)
212        .context("Source and target validation failed")?;
213    tracing::info!("✓ Verified source and target are different databases");
214
215    // Create managed temporary directory for dump files
216    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
217    let temp_path =
218        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219    tracing::debug!("Using temp directory: {}", temp_path.display());
220
221    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222        .context("Failed to determine checkpoint location")?;
223
224    // Step 1: Dump global objects
225    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226    let globals_file = temp_path.join("globals.sql");
227    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231        .context("Failed to remove SUPERUSER from globals dump")?;
232    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233        .context("Failed to remove restricted parameter settings from globals dump")?;
234    remove_restricted_role_grants(globals_file.to_str().unwrap())
235        .context("Failed to remove restricted role grants from globals dump")?;
236    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239    // Step 2: Restore global objects
240    tracing::info!("Step 2/4: Restoring global objects to target...");
241    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243    // Step 3: Discover and filter databases
244    tracing::info!("Step 3/4: Discovering databases...");
245    let all_databases = {
246        // Scope the connection so it's dropped before subprocess operations
247        let source_client = postgres::connect_with_retry(source_url).await?;
248        migration::list_databases(&source_client).await?
249    }; // Connection dropped here
250
251    // Apply filtering rules
252    let databases: Vec<_> = all_databases
253        .into_iter()
254        .filter(|db| filter.should_replicate_database(&db.name))
255        .collect();
256
257    if databases.is_empty() {
258        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259        if filter.is_empty() {
260            tracing::warn!("⚠ No user databases found on source");
261            tracing::warn!("  This is unusual - the source database appears empty");
262            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
263        } else {
264            tracing::warn!("⚠ No databases matched the filter criteria");
265            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
266        }
267        tracing::info!("✅ Initial replication complete (no databases to replicate)");
268        return Ok(());
269    }
270
271    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272    let filter_hash = filter.fingerprint();
273    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274        source_url,
275        target_url,
276        filter_hash,
277        drop_existing,
278        enable_sync,
279    );
280
281    let mut checkpoint_state = if allow_resume {
282        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283            Some(existing) => {
284                // Try to validate the checkpoint
285                match existing.validate(&checkpoint_metadata, &database_names) {
286                    Ok(()) => {
287                        // Validation succeeded - resume from checkpoint
288                        if existing.completed_count() > 0 {
289                            tracing::info!(
290                                "Resume checkpoint found: {}/{} databases already replicated",
291                                existing.completed_count(),
292                                existing.total_databases()
293                            );
294                        } else {
295                            tracing::info!(
296                                "Resume checkpoint found but no databases marked complete yet"
297                            );
298                        }
299                        existing
300                    }
301                    Err(e) => {
302                        // Validation failed - log warning and start fresh
303                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304                        tracing::warn!(
305                            "  Previous run configuration differs from current configuration"
306                        );
307                        tracing::warn!("  - Schema-only tables may have changed");
308                        tracing::warn!("  - Time filters may have changed");
309                        tracing::warn!("  - Table selection may have changed");
310                        tracing::warn!("  Error: {}", e);
311                        tracing::info!("");
312                        tracing::info!(
313                            "✓ Automatically discarding old checkpoint and starting fresh"
314                        );
315                        checkpoint::remove_checkpoint(&checkpoint_path)?;
316                        checkpoint::InitCheckpoint::new(
317                            checkpoint_metadata.clone(),
318                            &database_names,
319                        )
320                    }
321                }
322            }
323            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324        }
325    } else {
326        if checkpoint_path.exists() {
327            tracing::info!(
328                "--no-resume supplied: discarding previous checkpoint at {}",
329                checkpoint_path.display()
330            );
331        }
332        checkpoint::remove_checkpoint(&checkpoint_path)?;
333        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334    };
335
336    // Persist baseline state so crashes before first database can resume cleanly
337    checkpoint_state
338        .save(&checkpoint_path)
339        .context("Failed to persist checkpoint state")?;
340
341    tracing::info!("Found {} database(s) to replicate", databases.len());
342
343    // Estimate database sizes and get confirmation
344    if !skip_confirmation {
345        tracing::info!("Analyzing database sizes...");
346        let size_estimates = {
347            // Scope the connection so it's dropped after size estimation
348            let source_client = postgres::connect_with_retry(source_url).await?;
349            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350                .await?
351        }; // Connection dropped here
352
353        if !confirm_replication(&size_estimates)? {
354            bail!("Replication cancelled by user");
355        }
356    }
357
358    // Step 4: Replicate each database
359    tracing::info!("Step 4/4: Replicating databases...");
360    for (idx, db_info) in databases.iter().enumerate() {
361        let filtered_tables = filter.predicate_tables(&db_info.name);
362        if checkpoint_state.is_completed(&db_info.name) {
363            tracing::info!(
364                "Skipping database '{}' (already completed per checkpoint)",
365                db_info.name
366            );
367            continue;
368        }
369        tracing::info!(
370            "Replicating database {}/{}: '{}'",
371            idx + 1,
372            databases.len(),
373            db_info.name
374        );
375
376        // Build connection URLs for this specific database
377        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380        // Handle database creation atomically to avoid TOCTOU race condition
381        // Scope the connection so it's dropped before dump/restore subprocess operations
382        {
383            let target_client = postgres::connect_with_retry(target_url).await?;
384
385            // Validate database name to prevent SQL injection
386            crate::utils::validate_postgres_identifier(&db_info.name)
387                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389            // Try to create database atomically (avoids TOCTOU vulnerability)
390            let create_query = format!(
391                "CREATE DATABASE {}",
392                crate::utils::quote_ident(&db_info.name)
393            );
394            match target_client.execute(&create_query, &[]).await {
395                Ok(_) => {
396                    tracing::info!("  Created database '{}'", db_info.name);
397                }
398                Err(err) => {
399                    // Check if error is "database already exists" (error code 42P04)
400                    if let Some(db_error) = err.as_db_error() {
401                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402                            // Database already exists - handle based on user preferences
403                            tracing::info!(
404                                "  Database '{}' already exists on target",
405                                db_info.name
406                            );
407
408                            // Check if empty by connecting to the SPECIFIC database
409                            // (target_client is connected to default db, not the target db)
410                            let is_empty = {
411                                let db_client =
412                                    postgres::connect_with_retry(&target_db_url).await?;
413                                database_is_empty(&db_client).await?
414                            }; // db_client dropped here
415
416                            if is_empty {
417                                tracing::info!(
418                                    "  Database '{}' is empty, proceeding with restore",
419                                    db_info.name
420                                );
421                            } else {
422                                // Database exists and has data
423                                let should_drop = if drop_existing {
424                                    // Force drop with --drop-existing flag
425                                    true
426                                } else if skip_confirmation {
427                                    // Auto-confirm drop with --yes flag (non-interactive)
428                                    tracing::info!(
429                                        "  Auto-confirming drop for database '{}' (--yes flag)",
430                                        db_info.name
431                                    );
432                                    true
433                                } else {
434                                    // Interactive mode: prompt user
435                                    prompt_drop_database(&db_info.name)?
436                                };
437
438                                if should_drop {
439                                    drop_database_if_exists(&target_client, &db_info.name).await?;
440
441                                    // Recreate the database
442                                    let create_query = format!(
443                                        "CREATE DATABASE {}",
444                                        crate::utils::quote_ident(&db_info.name)
445                                    );
446                                    target_client
447                                        .execute(&create_query, &[])
448                                        .await
449                                        .with_context(|| {
450                                            format!(
451                                                "Failed to create database '{}' after drop",
452                                                db_info.name
453                                            )
454                                        })?;
455                                    tracing::info!("  Created database '{}'", db_info.name);
456                                } else {
457                                    bail!("Aborted: Database '{}' already exists", db_info.name);
458                                }
459                            }
460                        } else {
461                            // Some other database error - propagate it
462                            return Err(err).with_context(|| {
463                                format!("Failed to create database '{}'", db_info.name)
464                            });
465                        }
466                    } else {
467                        // Not a database error - propagate it
468                        return Err(err).with_context(|| {
469                            format!("Failed to create database '{}'", db_info.name)
470                        });
471                    }
472                }
473            }
474        } // Connection dropped here before dump/restore operations
475
476        // Dump and restore schema
477        tracing::info!("  Dumping schema for '{}'...", db_info.name);
478        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
479        migration::dump_schema(
480            &source_db_url,
481            &db_info.name,
482            schema_file.to_str().unwrap(),
483            &filter,
484        )
485        .await?;
486
487        tracing::info!("  Restoring schema for '{}'...", db_info.name);
488        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
489
490        // Dump and restore data (using directory format for parallel operations)
491        tracing::info!("  Dumping data for '{}'...", db_info.name);
492        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
493        migration::dump_data(
494            &source_db_url,
495            &db_info.name,
496            data_dir.to_str().unwrap(),
497            &filter,
498        )
499        .await?;
500
501        tracing::info!("  Restoring data for '{}'...", db_info.name);
502        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
503
504        if !filtered_tables.is_empty() {
505            tracing::info!(
506                "  Applying filtered replication for {} table(s)...",
507                filtered_tables.len()
508            );
509            migration::filtered::copy_filtered_tables(
510                &source_db_url,
511                &target_db_url,
512                &filtered_tables,
513            )
514            .await?;
515        }
516
517        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
518
519        checkpoint_state.mark_completed(&db_info.name);
520        checkpoint_state
521            .save(&checkpoint_path)
522            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
523    }
524
525    // Explicitly clean up temp directory
526    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
527    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
528        tracing::warn!("Failed to clean up temp directory: {}", e);
529        // Don't fail the entire operation if cleanup fails
530    }
531
532    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
533        tracing::warn!("Failed to remove checkpoint state: {}", err);
534    }
535
536    tracing::info!("✅ Initial replication complete");
537
538    // Check wal_level before attempting to set up sync
539    let mut should_enable_sync = enable_sync;
540    if enable_sync {
541        tracing::info!("Checking target wal_level for logical replication...");
542        let target_wal_level = {
543            // Scope the connection for quick wal_level check
544            let target_client = postgres::connect_with_retry(target_url).await?;
545            postgres::check_wal_level(&target_client).await?
546        }; // Connection dropped here
547
548        if target_wal_level != "logical" {
549            tracing::warn!("");
550            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
551            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
552            tracing::warn!("");
553            tracing::warn!("  To fix this:");
554            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
555            tracing::warn!("    2. Restart PostgreSQL server");
556            tracing::warn!(
557                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
558            );
559            tracing::warn!("");
560            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
561            should_enable_sync = false;
562        }
563    }
564
565    // Set up continuous logical replication if enabled
566    if should_enable_sync {
567        tracing::info!("");
568        tracing::info!("========================================");
569        tracing::info!("Step 5/5: Setting up continuous replication...");
570        tracing::info!("========================================");
571        tracing::info!("");
572
573        // Call sync command with the same filter
574        crate::commands::sync(
575            source_url,
576            target_url,
577            Some(filter),
578            None,
579            None,
580            None,
581            false,
582        )
583        .await
584        .context("Failed to set up continuous replication")?;
585
586        tracing::info!("");
587        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
588    } else {
589        tracing::info!("");
590        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
591        tracing::info!("  To enable it later, run:");
592        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
593    }
594
595    Ok(())
596}
597
598/// Replace the database name in a connection URL
599fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
600    // Parse URL to find database name
601    // Format: postgresql://user:pass@host:port/database?params
602
603    // Split by '?' to separate params
604    let parts: Vec<&str> = url.split('?').collect();
605    let base_url = parts[0];
606    let params = if parts.len() > 1 {
607        Some(parts[1])
608    } else {
609        None
610    };
611
612    // Split base by '/' to get everything before database name
613    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
614    if url_parts.len() != 2 {
615        anyhow::bail!("Invalid connection URL format");
616    }
617
618    // Reconstruct URL with new database name
619    let mut new_url = format!("{}/{}", url_parts[1], new_database);
620    if let Some(p) = params {
621        new_url = format!("{}?{}", new_url, p);
622    }
623
624    Ok(new_url)
625}
626
627/// Display database size estimates and prompt for confirmation
628///
629/// Shows a table with database names, sizes, and estimated replication times.
630/// Prompts the user to proceed with the replication.
631///
632/// # Arguments
633///
634/// * `sizes` - Vector of database size estimates
635///
636/// # Returns
637///
638/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
639///
640/// # Errors
641///
642/// Returns an error if stdin/stdout operations fail.
643fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
644    use std::time::Duration;
645
646    // Calculate totals
647    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
648    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
649
650    // Print table header
651    println!();
652    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
653    println!("{}", "─".repeat(50));
654
655    // Print each database
656    for size in sizes {
657        println!(
658            "{:<20} {:<12} {:<15}",
659            size.name,
660            size.size_human,
661            migration::format_duration(size.estimated_duration)
662        );
663    }
664
665    // Print totals
666    println!("{}", "─".repeat(50));
667    println!(
668        "Total: {} (estimated {})",
669        migration::format_bytes(total_bytes),
670        migration::format_duration(total_duration)
671    );
672    println!();
673
674    // Prompt for confirmation
675    print!("Proceed with replication? [y/N]: ");
676    io::stdout().flush()?;
677
678    let mut input = String::new();
679    io::stdin()
680        .read_line(&mut input)
681        .context("Failed to read user input")?;
682
683    Ok(input.trim().to_lowercase() == "y")
684}
685
686/// Checks if the currently connected database is empty (has no user tables).
687///
688/// Includes a 30-second timeout to prevent hanging on stale serverless connections.
689async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
690    let query = "
691        SELECT COUNT(*)
692        FROM information_schema.tables
693        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
694    ";
695
696    // Add timeout to prevent indefinite hang on stale serverless connections
697    let row = tokio::time::timeout(
698        std::time::Duration::from_secs(30),
699        client.query_one(query, &[]),
700    )
701    .await
702    .context("database_is_empty query timed out after 30 seconds")?
703    .context("Failed to query information_schema.tables")?;
704
705    let count: i64 = row.get(0);
706
707    Ok(count == 0)
708}
709
710/// Prompts user to drop existing database
711fn prompt_drop_database(db_name: &str) -> Result<bool> {
712    use std::io::{self, Write};
713
714    print!(
715        "\nWarning: Database '{}' already exists on target and contains data.\n\
716         Drop and recreate database? This will delete all existing data. [y/N]: ",
717        db_name
718    );
719    io::stdout().flush()?;
720
721    let mut input = String::new();
722    io::stdin().read_line(&mut input)?;
723
724    Ok(input.trim().eq_ignore_ascii_case("y"))
725}
726
727/// Drops a database if it exists
728async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
729    // Validate database name to prevent SQL injection
730    crate::utils::validate_postgres_identifier(db_name)
731        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
732
733    tracing::info!("  Dropping existing database '{}'...", db_name);
734
735    // Terminate existing connections to the database
736    // Skip connections owned by SUPERUSER roles (we can't terminate those on managed PostgreSQL)
737    let terminate_query = "
738        SELECT pg_terminate_backend(sa.pid)
739        FROM pg_stat_activity sa
740        JOIN pg_roles r ON sa.usename = r.rolname
741        WHERE sa.datname = $1
742          AND sa.pid <> pg_backend_pid()
743          AND NOT r.rolsuper
744    ";
745    target_conn.execute(terminate_query, &[&db_name]).await?;
746
747    // Check if any connections remain (including SUPERUSER connections we couldn't terminate)
748    let remaining_query = "
749        SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
750        FROM pg_stat_activity sa
751        WHERE sa.datname = $1
752          AND sa.pid <> pg_backend_pid()
753    ";
754    let row = target_conn
755        .query_one(remaining_query, &[&db_name])
756        .await
757        .context("Failed to check remaining connections")?;
758    let remaining_count: i64 = row.get(0);
759    let remaining_users: Option<String> = row.get(1);
760
761    if remaining_count > 0 {
762        let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
763        bail!(
764            "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
765             These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
766             This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
767             processes maintain superuser connections.\n\n\
768             To resolve this:\n\
769             1. Wait a few minutes and retry (system connections may be temporary)\n\
770             2. Ask your database administrator to terminate the blocking sessions:\n\
771                SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
772             3. If using AWS RDS, check for RDS-managed connections in the RDS console",
773            db_name,
774            remaining_count,
775            users,
776            db_name
777        );
778    }
779
780    // Drop the database
781    let drop_query = format!(
782        "DROP DATABASE IF EXISTS {}",
783        crate::utils::quote_ident(db_name)
784    );
785    target_conn
786        .execute(&drop_query, &[])
787        .await
788        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
789
790    tracing::info!("  ✓ Database '{}' dropped", db_name);
791    Ok(())
792}
793
794/// Initial replication from SQLite to PostgreSQL
795///
796/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
797/// 1. Validates SQLite file exists and is readable
798/// 2. Validates PostgreSQL target connection
799/// 3. Lists all tables from SQLite database
800/// 4. For each table:
801///    - Converts rows to JSONB format
802///    - Creates JSONB table in PostgreSQL
803///    - Batch inserts all data
804///
805/// All SQLite data is stored as JSONB with metadata:
806/// - id: Original row ID (from ID column or row number)
807/// - data: Complete row as JSON object
808/// - _source_type: "sqlite"
809/// - _migrated_at: Timestamp of migration
810///
811/// # Arguments
812///
813/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
814/// * `target_url` - PostgreSQL connection string for target (Seren) database
815///
816/// # Returns
817///
818/// Returns `Ok(())` if migration completes successfully.
819///
820/// # Errors
821///
822/// This function will return an error if:
823/// - SQLite file doesn't exist or isn't readable
824/// - Cannot connect to target PostgreSQL database
825/// - Table conversion fails
826/// - Database creation or insert operations fail
827///
828/// # Examples
829///
830/// ```no_run
831/// # use anyhow::Result;
832/// # use database_replicator::commands::init::init_sqlite_to_postgres;
833/// # async fn example() -> Result<()> {
834/// init_sqlite_to_postgres(
835///     "database.db",
836///     "postgresql://user:pass@seren.example.com/targetdb"
837/// ).await?;
838/// # Ok(())
839/// # }
840/// ```
841pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
842    tracing::info!("Starting SQLite to PostgreSQL migration...");
843
844    // Step 1: Validate SQLite file
845    tracing::info!("Step 1/4: Validating SQLite database...");
846    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
847        .context("SQLite file validation failed")?;
848    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
849
850    // Step 2: Open SQLite connection (read-only)
851    tracing::info!("Step 2/4: Opening SQLite database...");
852    let sqlite_conn =
853        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
854    tracing::info!("  ✓ SQLite database opened (read-only mode)");
855
856    // Step 3: List all tables
857    tracing::info!("Step 3/4: Discovering tables...");
858    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
859        .context("Failed to list tables from SQLite database")?;
860
861    if tables.is_empty() {
862        tracing::warn!("⚠ No tables found in SQLite database");
863        tracing::info!("✅ Migration complete (no tables to migrate)");
864        return Ok(());
865    }
866
867    tracing::info!("Found {} table(s) to migrate", tables.len());
868
869    // Connect to PostgreSQL target
870    let target_client = postgres::connect_with_retry(target_url).await?;
871    tracing::info!("  ✓ Connected to PostgreSQL target");
872
873    // Step 4: Migrate each table
874    tracing::info!("Step 4/4: Migrating tables...");
875    for (idx, table_name) in tables.iter().enumerate() {
876        tracing::info!(
877            "Migrating table {}/{}: '{}'",
878            idx + 1,
879            tables.len(),
880            table_name
881        );
882
883        // Convert SQLite table to JSONB
884        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
885            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
886
887        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
888
889        // Create JSONB table in PostgreSQL
890        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
891            .await
892            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
893
894        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
895
896        if !rows.is_empty() {
897            // Batch insert all rows
898            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
899                .await
900                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
901
902            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
903        } else {
904            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
905        }
906    }
907
908    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
909    tracing::info!(
910        "   Migrated {} table(s) from '{}' to PostgreSQL",
911        tables.len(),
912        sqlite_path
913    );
914
915    Ok(())
916}
917
918/// Initial replication from MongoDB to PostgreSQL
919///
920/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
921/// 1. Validates MongoDB connection string
922/// 2. Connects to MongoDB and verifies connection
923/// 3. Extracts database name from connection string
924/// 4. Lists all collections from MongoDB database
925/// 5. For each collection:
926///    - Converts documents to JSONB format
927///    - Creates JSONB table in PostgreSQL
928///    - Batch inserts all data
929///
930/// All MongoDB data is stored as JSONB with metadata:
931/// - id: Original _id field (ObjectId → hex, String/Int → string)
932/// - data: Complete document as JSON object
933/// - _source_type: "mongodb"
934/// - _migrated_at: Timestamp of migration
935///
936/// # Arguments
937///
938/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
939/// * `target_url` - PostgreSQL connection string for target (Seren) database
940///
941/// # Returns
942///
943/// Returns `Ok(())` if migration completes successfully.
944///
945/// # Errors
946///
947/// This function will return an error if:
948/// - MongoDB connection string is invalid
949/// - Cannot connect to MongoDB database
950/// - Database name is not specified in connection string
951/// - Cannot connect to target PostgreSQL database
952/// - Collection conversion fails
953/// - Database creation or insert operations fail
954///
955/// # Examples
956///
957/// ```no_run
958/// # use anyhow::Result;
959/// # use database_replicator::commands::init::init_mongodb_to_postgres;
960/// # async fn example() -> Result<()> {
961/// init_mongodb_to_postgres(
962///     "mongodb://localhost:27017/mydb",
963///     "postgresql://user:pass@seren.example.com/targetdb"
964/// ).await?;
965/// # Ok(())
966/// # }
967/// ```
968pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
969    tracing::info!("Starting MongoDB to PostgreSQL migration...");
970
971    // Step 1: Validate and connect to MongoDB
972    tracing::info!("Step 1/5: Validating MongoDB connection...");
973    let client = crate::mongodb::connect_mongodb(mongo_url)
974        .await
975        .context("MongoDB connection failed")?;
976    tracing::info!("  ✓ MongoDB connection validated");
977
978    // Step 2: Extract database name
979    tracing::info!("Step 2/5: Extracting database name...");
980    let db_name = crate::mongodb::extract_database_name(mongo_url)
981        .await
982        .context("Failed to parse MongoDB connection string")?
983        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
984    tracing::info!("  ✓ Database name: '{}'", db_name);
985
986    // Step 3: List all collections
987    tracing::info!("Step 3/5: Discovering collections...");
988    let db = client.database(&db_name);
989    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
990        .await
991        .context("Failed to list collections from MongoDB database")?;
992
993    if collections.is_empty() {
994        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
995        tracing::info!("✅ Migration complete (no collections to migrate)");
996        return Ok(());
997    }
998
999    tracing::info!("Found {} collection(s) to migrate", collections.len());
1000
1001    // Step 4: Connect to PostgreSQL target
1002    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1003    let target_client = postgres::connect_with_retry(target_url).await?;
1004    tracing::info!("  ✓ Connected to PostgreSQL target");
1005
1006    // Step 5: Migrate each collection
1007    tracing::info!("Step 5/5: Migrating collections...");
1008    for (idx, collection_name) in collections.iter().enumerate() {
1009        tracing::info!(
1010            "Migrating collection {}/{}: '{}'",
1011            idx + 1,
1012            collections.len(),
1013            collection_name
1014        );
1015
1016        // Convert MongoDB collection to JSONB
1017        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1018            .await
1019            .with_context(|| {
1020                format!(
1021                    "Failed to convert collection '{}' to JSONB",
1022                    collection_name
1023                )
1024            })?;
1025
1026        tracing::info!(
1027            "  ✓ Converted {} documents from '{}'",
1028            rows.len(),
1029            collection_name
1030        );
1031
1032        // Create JSONB table in PostgreSQL
1033        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1034            .await
1035            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1036
1037        tracing::info!(
1038            "  ✓ Created JSONB table '{}' in PostgreSQL",
1039            collection_name
1040        );
1041
1042        if !rows.is_empty() {
1043            // Batch insert all rows
1044            crate::jsonb::writer::insert_jsonb_batch(
1045                &target_client,
1046                collection_name,
1047                rows,
1048                "mongodb",
1049            )
1050            .await
1051            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1052
1053            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1054        } else {
1055            tracing::info!(
1056                "  ✓ Collection '{}' is empty (no documents to insert)",
1057                collection_name
1058            );
1059        }
1060    }
1061
1062    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1063    tracing::info!(
1064        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1065        collections.len(),
1066        db_name
1067    );
1068
1069    Ok(())
1070}
1071
1072/// Initial replication from MySQL to PostgreSQL
1073///
1074/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1075/// 1. Validates MySQL connection string
1076/// 2. Connects to MySQL and verifies connection
1077/// 3. Extracts database name from connection string
1078/// 4. Lists all tables from MySQL database
1079/// 5. For each table:
1080///    - Converts rows to JSONB format
1081///    - Creates JSONB table in PostgreSQL
1082///    - Batch inserts all data
1083///
1084/// All MySQL data is stored as JSONB with metadata:
1085/// - id: Primary key or auto-generated ID
1086/// - data: Complete row as JSON object
1087/// - _source_type: "mysql"
1088/// - _migrated_at: Timestamp of replication
1089///
1090/// # Arguments
1091///
1092/// * `mysql_url` - MySQL connection string (mysql://...)
1093/// * `target_url` - PostgreSQL connection string for target (Seren) database
1094///
1095/// # Returns
1096///
1097/// Returns `Ok(())` if replication completes successfully.
1098///
1099/// # Errors
1100///
1101/// This function will return an error if:
1102/// - MySQL connection string is invalid
1103/// - Cannot connect to MySQL database
1104/// - Database name is not specified in connection string
1105/// - Cannot connect to target PostgreSQL database
1106/// - Table conversion fails
1107/// - Database creation or insert operations fail
1108///
1109/// # Examples
1110///
1111/// ```no_run
1112/// # use anyhow::Result;
1113/// # use database_replicator::commands::init::init_mysql_to_postgres;
1114/// # async fn example() -> Result<()> {
1115/// init_mysql_to_postgres(
1116///     "mysql://user:pass@localhost:3306/mydb",
1117///     "postgresql://user:pass@seren.example.com/targetdb"
1118/// ).await?;
1119/// # Ok(())
1120/// # }
1121/// ```
1122pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1123    tracing::info!("Starting MySQL to PostgreSQL replication...");
1124
1125    // Step 1: Validate and connect to MySQL
1126    tracing::info!("Step 1/5: Validating MySQL connection...");
1127    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1128        .await
1129        .context("MySQL connection failed")?;
1130    tracing::info!("  ✓ MySQL connection validated");
1131
1132    // Step 2: Extract database name
1133    tracing::info!("Step 2/5: Extracting database name...");
1134    let db_name = crate::mysql::extract_database_name(mysql_url)
1135        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1136    tracing::info!("  ✓ Database name: '{}'", db_name);
1137
1138    // Step 3: List all tables
1139    tracing::info!("Step 3/5: Discovering tables...");
1140    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1141        .await
1142        .context("Failed to list tables from MySQL database")?;
1143
1144    if tables.is_empty() {
1145        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1146        tracing::info!("✅ Replication complete (no tables to replicate)");
1147        return Ok(());
1148    }
1149
1150    tracing::info!("Found {} table(s) to replicate", tables.len());
1151
1152    // Step 4: Connect to PostgreSQL target
1153    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1154    let target_client = postgres::connect_with_retry(target_url).await?;
1155    tracing::info!("  ✓ Connected to PostgreSQL target");
1156
1157    // Step 5: Replicate each table
1158    tracing::info!("Step 5/5: Replicating tables...");
1159    for (idx, table_name) in tables.iter().enumerate() {
1160        tracing::info!(
1161            "Replicating table {}/{}: '{}'",
1162            idx + 1,
1163            tables.len(),
1164            table_name
1165        );
1166
1167        // Convert MySQL table to JSONB
1168        let rows =
1169            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1170                .await
1171                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1172
1173        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1174
1175        // Create JSONB table in PostgreSQL
1176        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1177            .await
1178            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1179
1180        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1181
1182        if !rows.is_empty() {
1183            // Batch insert all rows
1184            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1185                .await
1186                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1187
1188            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1189        } else {
1190            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1191        }
1192    }
1193
1194    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1195    tracing::info!(
1196        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1197        tables.len(),
1198        db_name
1199    );
1200
1201    Ok(())
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207
1208    #[tokio::test]
1209    #[ignore]
1210    async fn test_init_replicates_database() {
1211        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1212        let target = std::env::var("TEST_TARGET_URL").unwrap();
1213
1214        // Skip confirmation for automated tests, disable sync to keep test simple
1215        let filter = crate::filters::ReplicationFilter::empty();
1216        let result = init(&source, &target, true, filter, false, false, true, false).await;
1217        assert!(result.is_ok());
1218    }
1219
1220    #[test]
1221    fn test_replace_database_in_url() {
1222        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1223        let result = replace_database_in_url(url, "newdb").unwrap();
1224        assert_eq!(
1225            result,
1226            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1227        );
1228
1229        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1230        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1231        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1232    }
1233
1234    #[tokio::test]
1235    #[ignore]
1236    async fn test_database_is_empty() {
1237        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1238
1239        // Connect to the database first
1240        let client = crate::postgres::connect_with_retry(&url)
1241            .await
1242            .expect("Failed to connect");
1243
1244        // postgres database might be empty of user tables
1245        // This test just verifies the function doesn't crash
1246        let result = database_is_empty(&client).await;
1247        assert!(result.is_ok());
1248    }
1249}