database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            let databases = filter.include_databases().map(|v| v.to_vec());
108            let preflight_result = crate::preflight::run_preflight_checks(
109                source_url,
110                target_url,
111                databases.as_deref(),
112            )
113            .await?;
114
115            preflight_result.print();
116
117            if !preflight_result.all_passed() {
118                // Check if we can auto-fallback to remote
119                if preflight_result.tool_version_incompatible
120                    && crate::utils::is_serendb_target(target_url)
121                    && !force_local
122                {
123                    println!();
124                    tracing::info!(
125                        "Tool version incompatible. Switching to SerenAI cloud execution..."
126                    );
127                    // Return special error that main.rs catches to trigger remote
128                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
129                }
130
131                // Cannot auto-fallback
132                if force_local {
133                    bail!(
134                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
135                         Fix the issues above or remove --local to allow remote execution."
136                    );
137                }
138
139                bail!("Pre-flight checks failed. Fix the issues above and retry.");
140            }
141
142            println!();
143        }
144        crate::SourceType::SQLite => {
145            // SQLite to PostgreSQL migration (simpler path)
146            tracing::info!("Source type: SQLite");
147
148            // SQLite migrations don't support PostgreSQL-specific features
149            if !filter.is_empty() {
150                tracing::warn!(
151                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
152                );
153            }
154            if drop_existing {
155                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
156            }
157            if !enable_sync {
158                tracing::warn!(
159                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
160                );
161            }
162
163            return init_sqlite_to_postgres(source_url, target_url).await;
164        }
165        crate::SourceType::MongoDB => {
166            // MongoDB to PostgreSQL migration (simpler path)
167            tracing::info!("Source type: MongoDB");
168
169            // MongoDB migrations don't support PostgreSQL-specific features
170            if !filter.is_empty() {
171                tracing::warn!(
172                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
173                );
174            }
175            if drop_existing {
176                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
177            }
178            if !enable_sync {
179                tracing::warn!(
180                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
181                );
182            }
183
184            return init_mongodb_to_postgres(source_url, target_url).await;
185        }
186        crate::SourceType::MySQL => {
187            // MySQL to PostgreSQL replication (simpler path)
188            tracing::info!("Source type: MySQL");
189
190            // MySQL replications don't support PostgreSQL-specific features
191            if !filter.is_empty() {
192                tracing::warn!(
193                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
194                );
195            }
196            if drop_existing {
197                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
198            }
199            if !enable_sync {
200                tracing::warn!(
201                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
202                );
203            }
204
205            return init_mysql_to_postgres(source_url, target_url).await;
206        }
207    }
208
209    // CRITICAL: Ensure source and target are different to prevent data loss
210    crate::utils::validate_source_target_different(source_url, target_url)
211        .context("Source and target validation failed")?;
212    tracing::info!("✓ Verified source and target are different databases");
213
214    // Create managed temporary directory for dump files
215    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
216    let temp_path =
217        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
218    tracing::debug!("Using temp directory: {}", temp_path.display());
219
220    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
221        .context("Failed to determine checkpoint location")?;
222
223    // Step 1: Dump global objects
224    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
225    let globals_file = temp_path.join("globals.sql");
226    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
227    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
228        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
229    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
230        .context("Failed to remove SUPERUSER from globals dump")?;
231    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
232        .context("Failed to remove restricted parameter settings from globals dump")?;
233    remove_restricted_role_grants(globals_file.to_str().unwrap())
234        .context("Failed to remove restricted role grants from globals dump")?;
235    migration::remove_tablespace_statements(globals_file.to_str().unwrap())
236        .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
237
238    // Step 2: Restore global objects
239    tracing::info!("Step 2/4: Restoring global objects to target...");
240    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
241
242    // Step 3: Discover and filter databases
243    tracing::info!("Step 3/4: Discovering databases...");
244    let all_databases = {
245        // Scope the connection so it's dropped before subprocess operations
246        let source_client = postgres::connect_with_retry(source_url).await?;
247        migration::list_databases(&source_client).await?
248    }; // Connection dropped here
249
250    // Apply filtering rules
251    let databases: Vec<_> = all_databases
252        .into_iter()
253        .filter(|db| filter.should_replicate_database(&db.name))
254        .collect();
255
256    if databases.is_empty() {
257        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
258        if filter.is_empty() {
259            tracing::warn!("⚠ No user databases found on source");
260            tracing::warn!("  This is unusual - the source database appears empty");
261            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
262        } else {
263            tracing::warn!("⚠ No databases matched the filter criteria");
264            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
265        }
266        tracing::info!("✅ Initial replication complete (no databases to replicate)");
267        return Ok(());
268    }
269
270    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
271    let filter_hash = filter.fingerprint();
272    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
273        source_url,
274        target_url,
275        filter_hash,
276        drop_existing,
277        enable_sync,
278    );
279
280    let mut checkpoint_state = if allow_resume {
281        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
282            Some(existing) => {
283                // Try to validate the checkpoint
284                match existing.validate(&checkpoint_metadata, &database_names) {
285                    Ok(()) => {
286                        // Validation succeeded - resume from checkpoint
287                        if existing.completed_count() > 0 {
288                            tracing::info!(
289                                "Resume checkpoint found: {}/{} databases already replicated",
290                                existing.completed_count(),
291                                existing.total_databases()
292                            );
293                        } else {
294                            tracing::info!(
295                                "Resume checkpoint found but no databases marked complete yet"
296                            );
297                        }
298                        existing
299                    }
300                    Err(e) => {
301                        // Validation failed - log warning and start fresh
302                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
303                        tracing::warn!(
304                            "  Previous run configuration differs from current configuration"
305                        );
306                        tracing::warn!("  - Schema-only tables may have changed");
307                        tracing::warn!("  - Time filters may have changed");
308                        tracing::warn!("  - Table selection may have changed");
309                        tracing::warn!("  Error: {}", e);
310                        tracing::info!("");
311                        tracing::info!(
312                            "✓ Automatically discarding old checkpoint and starting fresh"
313                        );
314                        checkpoint::remove_checkpoint(&checkpoint_path)?;
315                        checkpoint::InitCheckpoint::new(
316                            checkpoint_metadata.clone(),
317                            &database_names,
318                        )
319                    }
320                }
321            }
322            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
323        }
324    } else {
325        if checkpoint_path.exists() {
326            tracing::info!(
327                "--no-resume supplied: discarding previous checkpoint at {}",
328                checkpoint_path.display()
329            );
330        }
331        checkpoint::remove_checkpoint(&checkpoint_path)?;
332        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
333    };
334
335    // Persist baseline state so crashes before first database can resume cleanly
336    checkpoint_state
337        .save(&checkpoint_path)
338        .context("Failed to persist checkpoint state")?;
339
340    tracing::info!("Found {} database(s) to replicate", databases.len());
341
342    // Estimate database sizes and get confirmation
343    if !skip_confirmation {
344        tracing::info!("Analyzing database sizes...");
345        let size_estimates = {
346            // Scope the connection so it's dropped after size estimation
347            let source_client = postgres::connect_with_retry(source_url).await?;
348            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
349                .await?
350        }; // Connection dropped here
351
352        if !confirm_replication(&size_estimates)? {
353            bail!("Replication cancelled by user");
354        }
355    }
356
357    // Step 4: Replicate each database
358    tracing::info!("Step 4/4: Replicating databases...");
359    for (idx, db_info) in databases.iter().enumerate() {
360        let filtered_tables = filter.predicate_tables(&db_info.name);
361        if checkpoint_state.is_completed(&db_info.name) {
362            tracing::info!(
363                "Skipping database '{}' (already completed per checkpoint)",
364                db_info.name
365            );
366            continue;
367        }
368        tracing::info!(
369            "Replicating database {}/{}: '{}'",
370            idx + 1,
371            databases.len(),
372            db_info.name
373        );
374
375        // Build connection URLs for this specific database
376        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
377        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
378
379        // Handle database creation atomically to avoid TOCTOU race condition
380        // Scope the connection so it's dropped before dump/restore subprocess operations
381        {
382            let target_client = postgres::connect_with_retry(target_url).await?;
383
384            // Validate database name to prevent SQL injection
385            crate::utils::validate_postgres_identifier(&db_info.name)
386                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
387
388            // Try to create database atomically (avoids TOCTOU vulnerability)
389            let create_query = format!(
390                "CREATE DATABASE {}",
391                crate::utils::quote_ident(&db_info.name)
392            );
393            match target_client.execute(&create_query, &[]).await {
394                Ok(_) => {
395                    tracing::info!("  Created database '{}'", db_info.name);
396                }
397                Err(err) => {
398                    // Check if error is "database already exists" (error code 42P04)
399                    if let Some(db_error) = err.as_db_error() {
400                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
401                            // Database already exists - handle based on user preferences
402                            tracing::info!(
403                                "  Database '{}' already exists on target",
404                                db_info.name
405                            );
406
407                            // Check if empty
408                            if database_is_empty(target_url, &db_info.name).await? {
409                                tracing::info!(
410                                    "  Database '{}' is empty, proceeding with restore",
411                                    db_info.name
412                                );
413                            } else {
414                                // Database exists and has data
415                                let should_drop = if drop_existing {
416                                    // Force drop with --drop-existing flag
417                                    true
418                                } else if skip_confirmation {
419                                    // Auto-confirm drop with --yes flag (non-interactive)
420                                    tracing::info!(
421                                        "  Auto-confirming drop for database '{}' (--yes flag)",
422                                        db_info.name
423                                    );
424                                    true
425                                } else {
426                                    // Interactive mode: prompt user
427                                    prompt_drop_database(&db_info.name)?
428                                };
429
430                                if should_drop {
431                                    drop_database_if_exists(&target_client, &db_info.name).await?;
432
433                                    // Recreate the database
434                                    let create_query = format!(
435                                        "CREATE DATABASE {}",
436                                        crate::utils::quote_ident(&db_info.name)
437                                    );
438                                    target_client
439                                        .execute(&create_query, &[])
440                                        .await
441                                        .with_context(|| {
442                                            format!(
443                                                "Failed to create database '{}' after drop",
444                                                db_info.name
445                                            )
446                                        })?;
447                                    tracing::info!("  Created database '{}'", db_info.name);
448                                } else {
449                                    bail!("Aborted: Database '{}' already exists", db_info.name);
450                                }
451                            }
452                        } else {
453                            // Some other database error - propagate it
454                            return Err(err).with_context(|| {
455                                format!("Failed to create database '{}'", db_info.name)
456                            });
457                        }
458                    } else {
459                        // Not a database error - propagate it
460                        return Err(err).with_context(|| {
461                            format!("Failed to create database '{}'", db_info.name)
462                        });
463                    }
464                }
465            }
466        } // Connection dropped here before dump/restore operations
467
468        // Dump and restore schema
469        tracing::info!("  Dumping schema for '{}'...", db_info.name);
470        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
471        migration::dump_schema(
472            &source_db_url,
473            &db_info.name,
474            schema_file.to_str().unwrap(),
475            &filter,
476        )
477        .await?;
478
479        tracing::info!("  Restoring schema for '{}'...", db_info.name);
480        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
481
482        // Dump and restore data (using directory format for parallel operations)
483        tracing::info!("  Dumping data for '{}'...", db_info.name);
484        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
485        migration::dump_data(
486            &source_db_url,
487            &db_info.name,
488            data_dir.to_str().unwrap(),
489            &filter,
490        )
491        .await?;
492
493        tracing::info!("  Restoring data for '{}'...", db_info.name);
494        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
495
496        if !filtered_tables.is_empty() {
497            tracing::info!(
498                "  Applying filtered replication for {} table(s)...",
499                filtered_tables.len()
500            );
501            migration::filtered::copy_filtered_tables(
502                &source_db_url,
503                &target_db_url,
504                &filtered_tables,
505            )
506            .await?;
507        }
508
509        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
510
511        checkpoint_state.mark_completed(&db_info.name);
512        checkpoint_state
513            .save(&checkpoint_path)
514            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
515    }
516
517    // Explicitly clean up temp directory
518    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
519    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
520        tracing::warn!("Failed to clean up temp directory: {}", e);
521        // Don't fail the entire operation if cleanup fails
522    }
523
524    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
525        tracing::warn!("Failed to remove checkpoint state: {}", err);
526    }
527
528    tracing::info!("✅ Initial replication complete");
529
530    // Check wal_level before attempting to set up sync
531    let mut should_enable_sync = enable_sync;
532    if enable_sync {
533        tracing::info!("Checking target wal_level for logical replication...");
534        let target_wal_level = {
535            // Scope the connection for quick wal_level check
536            let target_client = postgres::connect_with_retry(target_url).await?;
537            postgres::check_wal_level(&target_client).await?
538        }; // Connection dropped here
539
540        if target_wal_level != "logical" {
541            tracing::warn!("");
542            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
543            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
544            tracing::warn!("");
545            tracing::warn!("  To fix this:");
546            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
547            tracing::warn!("    2. Restart PostgreSQL server");
548            tracing::warn!(
549                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
550            );
551            tracing::warn!("");
552            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
553            should_enable_sync = false;
554        }
555    }
556
557    // Set up continuous logical replication if enabled
558    if should_enable_sync {
559        tracing::info!("");
560        tracing::info!("========================================");
561        tracing::info!("Step 5/5: Setting up continuous replication...");
562        tracing::info!("========================================");
563        tracing::info!("");
564
565        // Call sync command with the same filter
566        crate::commands::sync(
567            source_url,
568            target_url,
569            Some(filter),
570            None,
571            None,
572            None,
573            false,
574        )
575        .await
576        .context("Failed to set up continuous replication")?;
577
578        tracing::info!("");
579        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
580    } else {
581        tracing::info!("");
582        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
583        tracing::info!("  To enable it later, run:");
584        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
585    }
586
587    Ok(())
588}
589
590/// Replace the database name in a connection URL
591fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
592    // Parse URL to find database name
593    // Format: postgresql://user:pass@host:port/database?params
594
595    // Split by '?' to separate params
596    let parts: Vec<&str> = url.split('?').collect();
597    let base_url = parts[0];
598    let params = if parts.len() > 1 {
599        Some(parts[1])
600    } else {
601        None
602    };
603
604    // Split base by '/' to get everything before database name
605    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
606    if url_parts.len() != 2 {
607        anyhow::bail!("Invalid connection URL format");
608    }
609
610    // Reconstruct URL with new database name
611    let mut new_url = format!("{}/{}", url_parts[1], new_database);
612    if let Some(p) = params {
613        new_url = format!("{}?{}", new_url, p);
614    }
615
616    Ok(new_url)
617}
618
619/// Display database size estimates and prompt for confirmation
620///
621/// Shows a table with database names, sizes, and estimated replication times.
622/// Prompts the user to proceed with the replication.
623///
624/// # Arguments
625///
626/// * `sizes` - Vector of database size estimates
627///
628/// # Returns
629///
630/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
631///
632/// # Errors
633///
634/// Returns an error if stdin/stdout operations fail.
635fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
636    use std::time::Duration;
637
638    // Calculate totals
639    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
640    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
641
642    // Print table header
643    println!();
644    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
645    println!("{}", "─".repeat(50));
646
647    // Print each database
648    for size in sizes {
649        println!(
650            "{:<20} {:<12} {:<15}",
651            size.name,
652            size.size_human,
653            migration::format_duration(size.estimated_duration)
654        );
655    }
656
657    // Print totals
658    println!("{}", "─".repeat(50));
659    println!(
660        "Total: {} (estimated {})",
661        migration::format_bytes(total_bytes),
662        migration::format_duration(total_duration)
663    );
664    println!();
665
666    // Prompt for confirmation
667    print!("Proceed with replication? [y/N]: ");
668    io::stdout().flush()?;
669
670    let mut input = String::new();
671    io::stdin()
672        .read_line(&mut input)
673        .context("Failed to read user input")?;
674
675    Ok(input.trim().to_lowercase() == "y")
676}
677
678/// Checks if a database is empty (no user tables)
679async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
680    // Need to connect to the specific database to check tables
681    let db_url = replace_database_in_url(target_url, db_name)?;
682    let client = postgres::connect_with_retry(&db_url).await?;
683
684    let query = "
685        SELECT COUNT(*)
686        FROM information_schema.tables
687        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
688    ";
689
690    let row = client.query_one(query, &[]).await?;
691    let count: i64 = row.get(0);
692
693    Ok(count == 0)
694}
695
696/// Prompts user to drop existing database
697fn prompt_drop_database(db_name: &str) -> Result<bool> {
698    use std::io::{self, Write};
699
700    print!(
701        "\nWarning: Database '{}' already exists on target and contains data.\n\
702         Drop and recreate database? This will delete all existing data. [y/N]: ",
703        db_name
704    );
705    io::stdout().flush()?;
706
707    let mut input = String::new();
708    io::stdin().read_line(&mut input)?;
709
710    Ok(input.trim().eq_ignore_ascii_case("y"))
711}
712
713/// Drops a database if it exists
714async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
715    // Validate database name to prevent SQL injection
716    crate::utils::validate_postgres_identifier(db_name)
717        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
718
719    tracing::info!("  Dropping existing database '{}'...", db_name);
720
721    // Terminate existing connections to the database
722    let terminate_query = "
723        SELECT pg_terminate_backend(pid)
724        FROM pg_stat_activity
725        WHERE datname = $1 AND pid <> pg_backend_pid()
726    ";
727    target_conn.execute(terminate_query, &[&db_name]).await?;
728
729    // Drop the database
730    let drop_query = format!(
731        "DROP DATABASE IF EXISTS {}",
732        crate::utils::quote_ident(db_name)
733    );
734    target_conn
735        .execute(&drop_query, &[])
736        .await
737        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
738
739    tracing::info!("  ✓ Database '{}' dropped", db_name);
740    Ok(())
741}
742
743/// Initial replication from SQLite to PostgreSQL
744///
745/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
746/// 1. Validates SQLite file exists and is readable
747/// 2. Validates PostgreSQL target connection
748/// 3. Lists all tables from SQLite database
749/// 4. For each table:
750///    - Converts rows to JSONB format
751///    - Creates JSONB table in PostgreSQL
752///    - Batch inserts all data
753///
754/// All SQLite data is stored as JSONB with metadata:
755/// - id: Original row ID (from ID column or row number)
756/// - data: Complete row as JSON object
757/// - _source_type: "sqlite"
758/// - _migrated_at: Timestamp of migration
759///
760/// # Arguments
761///
762/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
763/// * `target_url` - PostgreSQL connection string for target (Seren) database
764///
765/// # Returns
766///
767/// Returns `Ok(())` if migration completes successfully.
768///
769/// # Errors
770///
771/// This function will return an error if:
772/// - SQLite file doesn't exist or isn't readable
773/// - Cannot connect to target PostgreSQL database
774/// - Table conversion fails
775/// - Database creation or insert operations fail
776///
777/// # Examples
778///
779/// ```no_run
780/// # use anyhow::Result;
781/// # use database_replicator::commands::init::init_sqlite_to_postgres;
782/// # async fn example() -> Result<()> {
783/// init_sqlite_to_postgres(
784///     "database.db",
785///     "postgresql://user:pass@seren.example.com/targetdb"
786/// ).await?;
787/// # Ok(())
788/// # }
789/// ```
790pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
791    tracing::info!("Starting SQLite to PostgreSQL migration...");
792
793    // Step 1: Validate SQLite file
794    tracing::info!("Step 1/4: Validating SQLite database...");
795    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
796        .context("SQLite file validation failed")?;
797    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
798
799    // Step 2: Open SQLite connection (read-only)
800    tracing::info!("Step 2/4: Opening SQLite database...");
801    let sqlite_conn =
802        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
803    tracing::info!("  ✓ SQLite database opened (read-only mode)");
804
805    // Step 3: List all tables
806    tracing::info!("Step 3/4: Discovering tables...");
807    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
808        .context("Failed to list tables from SQLite database")?;
809
810    if tables.is_empty() {
811        tracing::warn!("⚠ No tables found in SQLite database");
812        tracing::info!("✅ Migration complete (no tables to migrate)");
813        return Ok(());
814    }
815
816    tracing::info!("Found {} table(s) to migrate", tables.len());
817
818    // Connect to PostgreSQL target
819    let target_client = postgres::connect_with_retry(target_url).await?;
820    tracing::info!("  ✓ Connected to PostgreSQL target");
821
822    // Step 4: Migrate each table
823    tracing::info!("Step 4/4: Migrating tables...");
824    for (idx, table_name) in tables.iter().enumerate() {
825        tracing::info!(
826            "Migrating table {}/{}: '{}'",
827            idx + 1,
828            tables.len(),
829            table_name
830        );
831
832        // Convert SQLite table to JSONB
833        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
834            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
835
836        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
837
838        // Create JSONB table in PostgreSQL
839        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
840            .await
841            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
842
843        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
844
845        if !rows.is_empty() {
846            // Batch insert all rows
847            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
848                .await
849                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
850
851            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
852        } else {
853            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
854        }
855    }
856
857    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
858    tracing::info!(
859        "   Migrated {} table(s) from '{}' to PostgreSQL",
860        tables.len(),
861        sqlite_path
862    );
863
864    Ok(())
865}
866
867/// Initial replication from MongoDB to PostgreSQL
868///
869/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
870/// 1. Validates MongoDB connection string
871/// 2. Connects to MongoDB and verifies connection
872/// 3. Extracts database name from connection string
873/// 4. Lists all collections from MongoDB database
874/// 5. For each collection:
875///    - Converts documents to JSONB format
876///    - Creates JSONB table in PostgreSQL
877///    - Batch inserts all data
878///
879/// All MongoDB data is stored as JSONB with metadata:
880/// - id: Original _id field (ObjectId → hex, String/Int → string)
881/// - data: Complete document as JSON object
882/// - _source_type: "mongodb"
883/// - _migrated_at: Timestamp of migration
884///
885/// # Arguments
886///
887/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
888/// * `target_url` - PostgreSQL connection string for target (Seren) database
889///
890/// # Returns
891///
892/// Returns `Ok(())` if migration completes successfully.
893///
894/// # Errors
895///
896/// This function will return an error if:
897/// - MongoDB connection string is invalid
898/// - Cannot connect to MongoDB database
899/// - Database name is not specified in connection string
900/// - Cannot connect to target PostgreSQL database
901/// - Collection conversion fails
902/// - Database creation or insert operations fail
903///
904/// # Examples
905///
906/// ```no_run
907/// # use anyhow::Result;
908/// # use database_replicator::commands::init::init_mongodb_to_postgres;
909/// # async fn example() -> Result<()> {
910/// init_mongodb_to_postgres(
911///     "mongodb://localhost:27017/mydb",
912///     "postgresql://user:pass@seren.example.com/targetdb"
913/// ).await?;
914/// # Ok(())
915/// # }
916/// ```
917pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
918    tracing::info!("Starting MongoDB to PostgreSQL migration...");
919
920    // Step 1: Validate and connect to MongoDB
921    tracing::info!("Step 1/5: Validating MongoDB connection...");
922    let client = crate::mongodb::connect_mongodb(mongo_url)
923        .await
924        .context("MongoDB connection failed")?;
925    tracing::info!("  ✓ MongoDB connection validated");
926
927    // Step 2: Extract database name
928    tracing::info!("Step 2/5: Extracting database name...");
929    let db_name = crate::mongodb::extract_database_name(mongo_url)
930        .await
931        .context("Failed to parse MongoDB connection string")?
932        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
933    tracing::info!("  ✓ Database name: '{}'", db_name);
934
935    // Step 3: List all collections
936    tracing::info!("Step 3/5: Discovering collections...");
937    let db = client.database(&db_name);
938    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
939        .await
940        .context("Failed to list collections from MongoDB database")?;
941
942    if collections.is_empty() {
943        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
944        tracing::info!("✅ Migration complete (no collections to migrate)");
945        return Ok(());
946    }
947
948    tracing::info!("Found {} collection(s) to migrate", collections.len());
949
950    // Step 4: Connect to PostgreSQL target
951    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
952    let target_client = postgres::connect_with_retry(target_url).await?;
953    tracing::info!("  ✓ Connected to PostgreSQL target");
954
955    // Step 5: Migrate each collection
956    tracing::info!("Step 5/5: Migrating collections...");
957    for (idx, collection_name) in collections.iter().enumerate() {
958        tracing::info!(
959            "Migrating collection {}/{}: '{}'",
960            idx + 1,
961            collections.len(),
962            collection_name
963        );
964
965        // Convert MongoDB collection to JSONB
966        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
967            .await
968            .with_context(|| {
969                format!(
970                    "Failed to convert collection '{}' to JSONB",
971                    collection_name
972                )
973            })?;
974
975        tracing::info!(
976            "  ✓ Converted {} documents from '{}'",
977            rows.len(),
978            collection_name
979        );
980
981        // Create JSONB table in PostgreSQL
982        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
983            .await
984            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
985
986        tracing::info!(
987            "  ✓ Created JSONB table '{}' in PostgreSQL",
988            collection_name
989        );
990
991        if !rows.is_empty() {
992            // Batch insert all rows
993            crate::jsonb::writer::insert_jsonb_batch(
994                &target_client,
995                collection_name,
996                rows,
997                "mongodb",
998            )
999            .await
1000            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1001
1002            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1003        } else {
1004            tracing::info!(
1005                "  ✓ Collection '{}' is empty (no documents to insert)",
1006                collection_name
1007            );
1008        }
1009    }
1010
1011    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1012    tracing::info!(
1013        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1014        collections.len(),
1015        db_name
1016    );
1017
1018    Ok(())
1019}
1020
1021/// Initial replication from MySQL to PostgreSQL
1022///
1023/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1024/// 1. Validates MySQL connection string
1025/// 2. Connects to MySQL and verifies connection
1026/// 3. Extracts database name from connection string
1027/// 4. Lists all tables from MySQL database
1028/// 5. For each table:
1029///    - Converts rows to JSONB format
1030///    - Creates JSONB table in PostgreSQL
1031///    - Batch inserts all data
1032///
1033/// All MySQL data is stored as JSONB with metadata:
1034/// - id: Primary key or auto-generated ID
1035/// - data: Complete row as JSON object
1036/// - _source_type: "mysql"
1037/// - _migrated_at: Timestamp of replication
1038///
1039/// # Arguments
1040///
1041/// * `mysql_url` - MySQL connection string (mysql://...)
1042/// * `target_url` - PostgreSQL connection string for target (Seren) database
1043///
1044/// # Returns
1045///
1046/// Returns `Ok(())` if replication completes successfully.
1047///
1048/// # Errors
1049///
1050/// This function will return an error if:
1051/// - MySQL connection string is invalid
1052/// - Cannot connect to MySQL database
1053/// - Database name is not specified in connection string
1054/// - Cannot connect to target PostgreSQL database
1055/// - Table conversion fails
1056/// - Database creation or insert operations fail
1057///
1058/// # Examples
1059///
1060/// ```no_run
1061/// # use anyhow::Result;
1062/// # use database_replicator::commands::init::init_mysql_to_postgres;
1063/// # async fn example() -> Result<()> {
1064/// init_mysql_to_postgres(
1065///     "mysql://user:pass@localhost:3306/mydb",
1066///     "postgresql://user:pass@seren.example.com/targetdb"
1067/// ).await?;
1068/// # Ok(())
1069/// # }
1070/// ```
1071pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1072    tracing::info!("Starting MySQL to PostgreSQL replication...");
1073
1074    // Step 1: Validate and connect to MySQL
1075    tracing::info!("Step 1/5: Validating MySQL connection...");
1076    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1077        .await
1078        .context("MySQL connection failed")?;
1079    tracing::info!("  ✓ MySQL connection validated");
1080
1081    // Step 2: Extract database name
1082    tracing::info!("Step 2/5: Extracting database name...");
1083    let db_name = crate::mysql::extract_database_name(mysql_url)
1084        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1085    tracing::info!("  ✓ Database name: '{}'", db_name);
1086
1087    // Step 3: List all tables
1088    tracing::info!("Step 3/5: Discovering tables...");
1089    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1090        .await
1091        .context("Failed to list tables from MySQL database")?;
1092
1093    if tables.is_empty() {
1094        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1095        tracing::info!("✅ Replication complete (no tables to replicate)");
1096        return Ok(());
1097    }
1098
1099    tracing::info!("Found {} table(s) to replicate", tables.len());
1100
1101    // Step 4: Connect to PostgreSQL target
1102    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1103    let target_client = postgres::connect_with_retry(target_url).await?;
1104    tracing::info!("  ✓ Connected to PostgreSQL target");
1105
1106    // Step 5: Replicate each table
1107    tracing::info!("Step 5/5: Replicating tables...");
1108    for (idx, table_name) in tables.iter().enumerate() {
1109        tracing::info!(
1110            "Replicating table {}/{}: '{}'",
1111            idx + 1,
1112            tables.len(),
1113            table_name
1114        );
1115
1116        // Convert MySQL table to JSONB
1117        let rows =
1118            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1119                .await
1120                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1121
1122        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1123
1124        // Create JSONB table in PostgreSQL
1125        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1126            .await
1127            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1128
1129        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1130
1131        if !rows.is_empty() {
1132            // Batch insert all rows
1133            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1134                .await
1135                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1136
1137            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1138        } else {
1139            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1140        }
1141    }
1142
1143    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1144    tracing::info!(
1145        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1146        tables.len(),
1147        db_name
1148    );
1149
1150    Ok(())
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156
1157    #[tokio::test]
1158    #[ignore]
1159    async fn test_init_replicates_database() {
1160        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1161        let target = std::env::var("TEST_TARGET_URL").unwrap();
1162
1163        // Skip confirmation for automated tests, disable sync to keep test simple
1164        let filter = crate::filters::ReplicationFilter::empty();
1165        let result = init(&source, &target, true, filter, false, false, true, false).await;
1166        assert!(result.is_ok());
1167    }
1168
1169    #[test]
1170    fn test_replace_database_in_url() {
1171        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1172        let result = replace_database_in_url(url, "newdb").unwrap();
1173        assert_eq!(
1174            result,
1175            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1176        );
1177
1178        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1179        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1180        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1181    }
1182
1183    #[tokio::test]
1184    #[ignore]
1185    async fn test_database_is_empty() {
1186        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1187
1188        // postgres database might be empty of user tables
1189        // This test just verifies the function doesn't crash
1190        let result = database_is_empty(&url, "postgres").await;
1191        assert!(result.is_ok());
1192    }
1193}