database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10/// Initial replication command for snapshot schema and data copy
11///
12/// Performs a full database dump and restore from source to target in steps:
13/// 1. Estimates database sizes and replication times
14/// 2. Prompts for confirmation (unless skip_confirmation is true)
15/// 3. Dumps global objects (roles, tablespaces) from source
16/// 4. Restores global objects to target
17/// 5. Discovers all user databases on source
18/// 6. Replicates each database (schema and data)
19/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
20///
21/// Uses temporary directory for dump files, which is automatically cleaned up.
22///
23/// # Arguments
24///
25/// * `source_url` - PostgreSQL connection string for source (Neon) database
26/// * `target_url` - PostgreSQL connection string for target (Seren) database
27/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
28/// * `filter` - Database and table filtering rules
29/// * `drop_existing` - Drop existing databases on target before copying
30/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
31/// * `allow_resume` - Resume from checkpoint if available (default: true)
32/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
33///
34/// # Returns
35///
36/// Returns `Ok(())` if replication completes successfully.
37///
38/// # Errors
39///
40/// This function will return an error if:
41/// - Cannot create temporary directory
42/// - Global objects dump/restore fails
43/// - Cannot connect to source database
44/// - Database discovery fails
45/// - Any database replication fails
46/// - User declines confirmation prompt
47/// - Logical replication setup fails (if enable_sync is true)
48///
49/// # Examples
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use database_replicator::commands::init;
54/// # use database_replicator::filters::ReplicationFilter;
55/// # async fn example() -> Result<()> {
56/// // With confirmation prompt and automatic sync (default)
57/// init(
58///     "postgresql://user:pass@neon.tech/sourcedb",
59///     "postgresql://user:pass@seren.example.com/targetdb",
60///     false,
61///     ReplicationFilter::empty(),
62///     false,
63///     true,   // Enable continuous replication
64///     true,   // Allow resume
65///     false,  // Not forcing local execution
66/// ).await?;
67///
68/// // Snapshot only (no continuous replication)
69/// init(
70///     "postgresql://user:pass@neon.tech/sourcedb",
71///     "postgresql://user:pass@seren.example.com/targetdb",
72///     true,
73///     ReplicationFilter::empty(),
74///     false,
75///     false,  // Disable continuous replication
76///     true,   // Allow resume
77///     true,   // Force local execution (--local flag)
78/// ).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[allow(clippy::too_many_arguments)]
83pub async fn init(
84    source_url: &str,
85    target_url: &str,
86    skip_confirmation: bool,
87    filter: crate::filters::ReplicationFilter,
88    drop_existing: bool,
89    enable_sync: bool,
90    allow_resume: bool,
91    force_local: bool,
92) -> Result<()> {
93    tracing::info!("Starting initial replication...");
94
95    // Detect source database type and route to appropriate implementation
96    let source_type =
97        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99    match source_type {
100        crate::SourceType::PostgreSQL => {
101            // PostgreSQL to PostgreSQL replication (existing logic below)
102            tracing::info!("Source type: PostgreSQL");
103
104            // Run pre-flight checks before any destructive operations
105            tracing::info!("Running pre-flight checks...");
106
107            let databases = filter.include_databases().map(|v| v.to_vec());
108            let preflight_result = crate::preflight::run_preflight_checks(
109                source_url,
110                target_url,
111                databases.as_deref(),
112            )
113            .await?;
114
115            preflight_result.print();
116
117            if !preflight_result.all_passed() {
118                // Check if we can auto-fallback to remote
119                if preflight_result.tool_version_incompatible
120                    && crate::utils::is_serendb_target(target_url)
121                    && !force_local
122                {
123                    println!();
124                    tracing::info!(
125                        "Tool version incompatible. Switching to SerenAI cloud execution..."
126                    );
127                    // Return special error that main.rs catches to trigger remote
128                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
129                }
130
131                // Cannot auto-fallback
132                if force_local {
133                    bail!(
134                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
135                         Fix the issues above or remove --local to allow remote execution."
136                    );
137                }
138
139                bail!("Pre-flight checks failed. Fix the issues above and retry.");
140            }
141
142            println!();
143        }
144        crate::SourceType::SQLite => {
145            // SQLite to PostgreSQL migration (simpler path)
146            tracing::info!("Source type: SQLite");
147
148            // SQLite migrations don't support PostgreSQL-specific features
149            if !filter.is_empty() {
150                tracing::warn!(
151                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
152                );
153            }
154            if drop_existing {
155                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
156            }
157            if !enable_sync {
158                tracing::warn!(
159                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
160                );
161            }
162
163            return init_sqlite_to_postgres(source_url, target_url).await;
164        }
165        crate::SourceType::MongoDB => {
166            // MongoDB to PostgreSQL migration (simpler path)
167            tracing::info!("Source type: MongoDB");
168
169            // MongoDB migrations don't support PostgreSQL-specific features
170            if !filter.is_empty() {
171                tracing::warn!(
172                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
173                );
174            }
175            if drop_existing {
176                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
177            }
178            if !enable_sync {
179                tracing::warn!(
180                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
181                );
182            }
183
184            return init_mongodb_to_postgres(source_url, target_url).await;
185        }
186        crate::SourceType::MySQL => {
187            // MySQL to PostgreSQL replication (simpler path)
188            tracing::info!("Source type: MySQL");
189
190            // MySQL replications don't support PostgreSQL-specific features
191            if !filter.is_empty() {
192                tracing::warn!(
193                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
194                );
195            }
196            if drop_existing {
197                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
198            }
199            if !enable_sync {
200                tracing::warn!(
201                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
202                );
203            }
204
205            return init_mysql_to_postgres(source_url, target_url).await;
206        }
207    }
208
209    // CRITICAL: Ensure source and target are different to prevent data loss
210    crate::utils::validate_source_target_different(source_url, target_url)
211        .context("Source and target validation failed")?;
212    tracing::info!("✓ Verified source and target are different databases");
213
214    // Create managed temporary directory for dump files
215    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
216    let temp_path =
217        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
218    tracing::debug!("Using temp directory: {}", temp_path.display());
219
220    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
221        .context("Failed to determine checkpoint location")?;
222
223    // Step 1: Dump global objects
224    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
225    let globals_file = temp_path.join("globals.sql");
226    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
227    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
228        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
229    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
230        .context("Failed to remove SUPERUSER from globals dump")?;
231    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
232        .context("Failed to remove restricted parameter settings from globals dump")?;
233    remove_restricted_role_grants(globals_file.to_str().unwrap())
234        .context("Failed to remove restricted role grants from globals dump")?;
235
236    // Step 2: Restore global objects
237    tracing::info!("Step 2/4: Restoring global objects to target...");
238    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
239
240    // Step 3: Discover and filter databases
241    tracing::info!("Step 3/4: Discovering databases...");
242    let all_databases = {
243        // Scope the connection so it's dropped before subprocess operations
244        let source_client = postgres::connect_with_retry(source_url).await?;
245        migration::list_databases(&source_client).await?
246    }; // Connection dropped here
247
248    // Apply filtering rules
249    let databases: Vec<_> = all_databases
250        .into_iter()
251        .filter(|db| filter.should_replicate_database(&db.name))
252        .collect();
253
254    if databases.is_empty() {
255        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
256        if filter.is_empty() {
257            tracing::warn!("⚠ No user databases found on source");
258            tracing::warn!("  This is unusual - the source database appears empty");
259            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
260        } else {
261            tracing::warn!("⚠ No databases matched the filter criteria");
262            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
263        }
264        tracing::info!("✅ Initial replication complete (no databases to replicate)");
265        return Ok(());
266    }
267
268    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
269    let filter_hash = filter.fingerprint();
270    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
271        source_url,
272        target_url,
273        filter_hash,
274        drop_existing,
275        enable_sync,
276    );
277
278    let mut checkpoint_state = if allow_resume {
279        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
280            Some(existing) => {
281                // Try to validate the checkpoint
282                match existing.validate(&checkpoint_metadata, &database_names) {
283                    Ok(()) => {
284                        // Validation succeeded - resume from checkpoint
285                        if existing.completed_count() > 0 {
286                            tracing::info!(
287                                "Resume checkpoint found: {}/{} databases already replicated",
288                                existing.completed_count(),
289                                existing.total_databases()
290                            );
291                        } else {
292                            tracing::info!(
293                                "Resume checkpoint found but no databases marked complete yet"
294                            );
295                        }
296                        existing
297                    }
298                    Err(e) => {
299                        // Validation failed - log warning and start fresh
300                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
301                        tracing::warn!(
302                            "  Previous run configuration differs from current configuration"
303                        );
304                        tracing::warn!("  - Schema-only tables may have changed");
305                        tracing::warn!("  - Time filters may have changed");
306                        tracing::warn!("  - Table selection may have changed");
307                        tracing::warn!("  Error: {}", e);
308                        tracing::info!("");
309                        tracing::info!(
310                            "✓ Automatically discarding old checkpoint and starting fresh"
311                        );
312                        checkpoint::remove_checkpoint(&checkpoint_path)?;
313                        checkpoint::InitCheckpoint::new(
314                            checkpoint_metadata.clone(),
315                            &database_names,
316                        )
317                    }
318                }
319            }
320            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
321        }
322    } else {
323        if checkpoint_path.exists() {
324            tracing::info!(
325                "--no-resume supplied: discarding previous checkpoint at {}",
326                checkpoint_path.display()
327            );
328        }
329        checkpoint::remove_checkpoint(&checkpoint_path)?;
330        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
331    };
332
333    // Persist baseline state so crashes before first database can resume cleanly
334    checkpoint_state
335        .save(&checkpoint_path)
336        .context("Failed to persist checkpoint state")?;
337
338    tracing::info!("Found {} database(s) to replicate", databases.len());
339
340    // Estimate database sizes and get confirmation
341    if !skip_confirmation {
342        tracing::info!("Analyzing database sizes...");
343        let size_estimates = {
344            // Scope the connection so it's dropped after size estimation
345            let source_client = postgres::connect_with_retry(source_url).await?;
346            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
347                .await?
348        }; // Connection dropped here
349
350        if !confirm_replication(&size_estimates)? {
351            bail!("Replication cancelled by user");
352        }
353    }
354
355    // Step 4: Replicate each database
356    tracing::info!("Step 4/4: Replicating databases...");
357    for (idx, db_info) in databases.iter().enumerate() {
358        let filtered_tables = filter.predicate_tables(&db_info.name);
359        if checkpoint_state.is_completed(&db_info.name) {
360            tracing::info!(
361                "Skipping database '{}' (already completed per checkpoint)",
362                db_info.name
363            );
364            continue;
365        }
366        tracing::info!(
367            "Replicating database {}/{}: '{}'",
368            idx + 1,
369            databases.len(),
370            db_info.name
371        );
372
373        // Build connection URLs for this specific database
374        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
375        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
376
377        // Handle database creation atomically to avoid TOCTOU race condition
378        // Scope the connection so it's dropped before dump/restore subprocess operations
379        {
380            let target_client = postgres::connect_with_retry(target_url).await?;
381
382            // Validate database name to prevent SQL injection
383            crate::utils::validate_postgres_identifier(&db_info.name)
384                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
385
386            // Try to create database atomically (avoids TOCTOU vulnerability)
387            let create_query = format!(
388                "CREATE DATABASE {}",
389                crate::utils::quote_ident(&db_info.name)
390            );
391            match target_client.execute(&create_query, &[]).await {
392                Ok(_) => {
393                    tracing::info!("  Created database '{}'", db_info.name);
394                }
395                Err(err) => {
396                    // Check if error is "database already exists" (error code 42P04)
397                    if let Some(db_error) = err.as_db_error() {
398                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
399                            // Database already exists - handle based on user preferences
400                            tracing::info!(
401                                "  Database '{}' already exists on target",
402                                db_info.name
403                            );
404
405                            // Check if empty
406                            if database_is_empty(target_url, &db_info.name).await? {
407                                tracing::info!(
408                                    "  Database '{}' is empty, proceeding with restore",
409                                    db_info.name
410                                );
411                            } else {
412                                // Database exists and has data
413                                let should_drop = if drop_existing {
414                                    // Force drop with --drop-existing flag
415                                    true
416                                } else if skip_confirmation {
417                                    // Auto-confirm drop with --yes flag (non-interactive)
418                                    tracing::info!(
419                                        "  Auto-confirming drop for database '{}' (--yes flag)",
420                                        db_info.name
421                                    );
422                                    true
423                                } else {
424                                    // Interactive mode: prompt user
425                                    prompt_drop_database(&db_info.name)?
426                                };
427
428                                if should_drop {
429                                    drop_database_if_exists(&target_client, &db_info.name).await?;
430
431                                    // Recreate the database
432                                    let create_query = format!(
433                                        "CREATE DATABASE {}",
434                                        crate::utils::quote_ident(&db_info.name)
435                                    );
436                                    target_client
437                                        .execute(&create_query, &[])
438                                        .await
439                                        .with_context(|| {
440                                            format!(
441                                                "Failed to create database '{}' after drop",
442                                                db_info.name
443                                            )
444                                        })?;
445                                    tracing::info!("  Created database '{}'", db_info.name);
446                                } else {
447                                    bail!("Aborted: Database '{}' already exists", db_info.name);
448                                }
449                            }
450                        } else {
451                            // Some other database error - propagate it
452                            return Err(err).with_context(|| {
453                                format!("Failed to create database '{}'", db_info.name)
454                            });
455                        }
456                    } else {
457                        // Not a database error - propagate it
458                        return Err(err).with_context(|| {
459                            format!("Failed to create database '{}'", db_info.name)
460                        });
461                    }
462                }
463            }
464        } // Connection dropped here before dump/restore operations
465
466        // Dump and restore schema
467        tracing::info!("  Dumping schema for '{}'...", db_info.name);
468        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
469        migration::dump_schema(
470            &source_db_url,
471            &db_info.name,
472            schema_file.to_str().unwrap(),
473            &filter,
474        )
475        .await?;
476
477        tracing::info!("  Restoring schema for '{}'...", db_info.name);
478        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
479
480        // Dump and restore data (using directory format for parallel operations)
481        tracing::info!("  Dumping data for '{}'...", db_info.name);
482        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
483        migration::dump_data(
484            &source_db_url,
485            &db_info.name,
486            data_dir.to_str().unwrap(),
487            &filter,
488        )
489        .await?;
490
491        tracing::info!("  Restoring data for '{}'...", db_info.name);
492        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
493
494        if !filtered_tables.is_empty() {
495            tracing::info!(
496                "  Applying filtered replication for {} table(s)...",
497                filtered_tables.len()
498            );
499            migration::filtered::copy_filtered_tables(
500                &source_db_url,
501                &target_db_url,
502                &filtered_tables,
503            )
504            .await?;
505        }
506
507        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
508
509        checkpoint_state.mark_completed(&db_info.name);
510        checkpoint_state
511            .save(&checkpoint_path)
512            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
513    }
514
515    // Explicitly clean up temp directory
516    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
517    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
518        tracing::warn!("Failed to clean up temp directory: {}", e);
519        // Don't fail the entire operation if cleanup fails
520    }
521
522    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
523        tracing::warn!("Failed to remove checkpoint state: {}", err);
524    }
525
526    tracing::info!("✅ Initial replication complete");
527
528    // Check wal_level before attempting to set up sync
529    let mut should_enable_sync = enable_sync;
530    if enable_sync {
531        tracing::info!("Checking target wal_level for logical replication...");
532        let target_wal_level = {
533            // Scope the connection for quick wal_level check
534            let target_client = postgres::connect_with_retry(target_url).await?;
535            postgres::check_wal_level(&target_client).await?
536        }; // Connection dropped here
537
538        if target_wal_level != "logical" {
539            tracing::warn!("");
540            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
541            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
542            tracing::warn!("");
543            tracing::warn!("  To fix this:");
544            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
545            tracing::warn!("    2. Restart PostgreSQL server");
546            tracing::warn!(
547                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
548            );
549            tracing::warn!("");
550            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
551            should_enable_sync = false;
552        }
553    }
554
555    // Set up continuous logical replication if enabled
556    if should_enable_sync {
557        tracing::info!("");
558        tracing::info!("========================================");
559        tracing::info!("Step 5/5: Setting up continuous replication...");
560        tracing::info!("========================================");
561        tracing::info!("");
562
563        // Call sync command with the same filter
564        crate::commands::sync(
565            source_url,
566            target_url,
567            Some(filter),
568            None,
569            None,
570            None,
571            false,
572        )
573        .await
574        .context("Failed to set up continuous replication")?;
575
576        tracing::info!("");
577        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
578    } else {
579        tracing::info!("");
580        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
581        tracing::info!("  To enable it later, run:");
582        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
583    }
584
585    Ok(())
586}
587
588/// Replace the database name in a connection URL
589fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
590    // Parse URL to find database name
591    // Format: postgresql://user:pass@host:port/database?params
592
593    // Split by '?' to separate params
594    let parts: Vec<&str> = url.split('?').collect();
595    let base_url = parts[0];
596    let params = if parts.len() > 1 {
597        Some(parts[1])
598    } else {
599        None
600    };
601
602    // Split base by '/' to get everything before database name
603    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
604    if url_parts.len() != 2 {
605        anyhow::bail!("Invalid connection URL format");
606    }
607
608    // Reconstruct URL with new database name
609    let mut new_url = format!("{}/{}", url_parts[1], new_database);
610    if let Some(p) = params {
611        new_url = format!("{}?{}", new_url, p);
612    }
613
614    Ok(new_url)
615}
616
617/// Display database size estimates and prompt for confirmation
618///
619/// Shows a table with database names, sizes, and estimated replication times.
620/// Prompts the user to proceed with the replication.
621///
622/// # Arguments
623///
624/// * `sizes` - Vector of database size estimates
625///
626/// # Returns
627///
628/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
629///
630/// # Errors
631///
632/// Returns an error if stdin/stdout operations fail.
633fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
634    use std::time::Duration;
635
636    // Calculate totals
637    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
638    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
639
640    // Print table header
641    println!();
642    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
643    println!("{}", "─".repeat(50));
644
645    // Print each database
646    for size in sizes {
647        println!(
648            "{:<20} {:<12} {:<15}",
649            size.name,
650            size.size_human,
651            migration::format_duration(size.estimated_duration)
652        );
653    }
654
655    // Print totals
656    println!("{}", "─".repeat(50));
657    println!(
658        "Total: {} (estimated {})",
659        migration::format_bytes(total_bytes),
660        migration::format_duration(total_duration)
661    );
662    println!();
663
664    // Prompt for confirmation
665    print!("Proceed with replication? [y/N]: ");
666    io::stdout().flush()?;
667
668    let mut input = String::new();
669    io::stdin()
670        .read_line(&mut input)
671        .context("Failed to read user input")?;
672
673    Ok(input.trim().to_lowercase() == "y")
674}
675
676/// Checks if a database is empty (no user tables)
677async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
678    // Need to connect to the specific database to check tables
679    let db_url = replace_database_in_url(target_url, db_name)?;
680    let client = postgres::connect_with_retry(&db_url).await?;
681
682    let query = "
683        SELECT COUNT(*)
684        FROM information_schema.tables
685        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
686    ";
687
688    let row = client.query_one(query, &[]).await?;
689    let count: i64 = row.get(0);
690
691    Ok(count == 0)
692}
693
694/// Prompts user to drop existing database
695fn prompt_drop_database(db_name: &str) -> Result<bool> {
696    use std::io::{self, Write};
697
698    print!(
699        "\nWarning: Database '{}' already exists on target and contains data.\n\
700         Drop and recreate database? This will delete all existing data. [y/N]: ",
701        db_name
702    );
703    io::stdout().flush()?;
704
705    let mut input = String::new();
706    io::stdin().read_line(&mut input)?;
707
708    Ok(input.trim().eq_ignore_ascii_case("y"))
709}
710
711/// Drops a database if it exists
712async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
713    // Validate database name to prevent SQL injection
714    crate::utils::validate_postgres_identifier(db_name)
715        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
716
717    tracing::info!("  Dropping existing database '{}'...", db_name);
718
719    // Terminate existing connections to the database
720    let terminate_query = "
721        SELECT pg_terminate_backend(pid)
722        FROM pg_stat_activity
723        WHERE datname = $1 AND pid <> pg_backend_pid()
724    ";
725    target_conn.execute(terminate_query, &[&db_name]).await?;
726
727    // Drop the database
728    let drop_query = format!(
729        "DROP DATABASE IF EXISTS {}",
730        crate::utils::quote_ident(db_name)
731    );
732    target_conn
733        .execute(&drop_query, &[])
734        .await
735        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
736
737    tracing::info!("  ✓ Database '{}' dropped", db_name);
738    Ok(())
739}
740
741/// Initial replication from SQLite to PostgreSQL
742///
743/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
744/// 1. Validates SQLite file exists and is readable
745/// 2. Validates PostgreSQL target connection
746/// 3. Lists all tables from SQLite database
747/// 4. For each table:
748///    - Converts rows to JSONB format
749///    - Creates JSONB table in PostgreSQL
750///    - Batch inserts all data
751///
752/// All SQLite data is stored as JSONB with metadata:
753/// - id: Original row ID (from ID column or row number)
754/// - data: Complete row as JSON object
755/// - _source_type: "sqlite"
756/// - _migrated_at: Timestamp of migration
757///
758/// # Arguments
759///
760/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
761/// * `target_url` - PostgreSQL connection string for target (Seren) database
762///
763/// # Returns
764///
765/// Returns `Ok(())` if migration completes successfully.
766///
767/// # Errors
768///
769/// This function will return an error if:
770/// - SQLite file doesn't exist or isn't readable
771/// - Cannot connect to target PostgreSQL database
772/// - Table conversion fails
773/// - Database creation or insert operations fail
774///
775/// # Examples
776///
777/// ```no_run
778/// # use anyhow::Result;
779/// # use database_replicator::commands::init::init_sqlite_to_postgres;
780/// # async fn example() -> Result<()> {
781/// init_sqlite_to_postgres(
782///     "database.db",
783///     "postgresql://user:pass@seren.example.com/targetdb"
784/// ).await?;
785/// # Ok(())
786/// # }
787/// ```
788pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
789    tracing::info!("Starting SQLite to PostgreSQL migration...");
790
791    // Step 1: Validate SQLite file
792    tracing::info!("Step 1/4: Validating SQLite database...");
793    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
794        .context("SQLite file validation failed")?;
795    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
796
797    // Step 2: Open SQLite connection (read-only)
798    tracing::info!("Step 2/4: Opening SQLite database...");
799    let sqlite_conn =
800        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
801    tracing::info!("  ✓ SQLite database opened (read-only mode)");
802
803    // Step 3: List all tables
804    tracing::info!("Step 3/4: Discovering tables...");
805    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
806        .context("Failed to list tables from SQLite database")?;
807
808    if tables.is_empty() {
809        tracing::warn!("⚠ No tables found in SQLite database");
810        tracing::info!("✅ Migration complete (no tables to migrate)");
811        return Ok(());
812    }
813
814    tracing::info!("Found {} table(s) to migrate", tables.len());
815
816    // Connect to PostgreSQL target
817    let target_client = postgres::connect_with_retry(target_url).await?;
818    tracing::info!("  ✓ Connected to PostgreSQL target");
819
820    // Step 4: Migrate each table
821    tracing::info!("Step 4/4: Migrating tables...");
822    for (idx, table_name) in tables.iter().enumerate() {
823        tracing::info!(
824            "Migrating table {}/{}: '{}'",
825            idx + 1,
826            tables.len(),
827            table_name
828        );
829
830        // Convert SQLite table to JSONB
831        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
832            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
833
834        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
835
836        // Create JSONB table in PostgreSQL
837        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
838            .await
839            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
840
841        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
842
843        if !rows.is_empty() {
844            // Batch insert all rows
845            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
846                .await
847                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
848
849            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
850        } else {
851            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
852        }
853    }
854
855    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
856    tracing::info!(
857        "   Migrated {} table(s) from '{}' to PostgreSQL",
858        tables.len(),
859        sqlite_path
860    );
861
862    Ok(())
863}
864
865/// Initial replication from MongoDB to PostgreSQL
866///
867/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
868/// 1. Validates MongoDB connection string
869/// 2. Connects to MongoDB and verifies connection
870/// 3. Extracts database name from connection string
871/// 4. Lists all collections from MongoDB database
872/// 5. For each collection:
873///    - Converts documents to JSONB format
874///    - Creates JSONB table in PostgreSQL
875///    - Batch inserts all data
876///
877/// All MongoDB data is stored as JSONB with metadata:
878/// - id: Original _id field (ObjectId → hex, String/Int → string)
879/// - data: Complete document as JSON object
880/// - _source_type: "mongodb"
881/// - _migrated_at: Timestamp of migration
882///
883/// # Arguments
884///
885/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
886/// * `target_url` - PostgreSQL connection string for target (Seren) database
887///
888/// # Returns
889///
890/// Returns `Ok(())` if migration completes successfully.
891///
892/// # Errors
893///
894/// This function will return an error if:
895/// - MongoDB connection string is invalid
896/// - Cannot connect to MongoDB database
897/// - Database name is not specified in connection string
898/// - Cannot connect to target PostgreSQL database
899/// - Collection conversion fails
900/// - Database creation or insert operations fail
901///
902/// # Examples
903///
904/// ```no_run
905/// # use anyhow::Result;
906/// # use database_replicator::commands::init::init_mongodb_to_postgres;
907/// # async fn example() -> Result<()> {
908/// init_mongodb_to_postgres(
909///     "mongodb://localhost:27017/mydb",
910///     "postgresql://user:pass@seren.example.com/targetdb"
911/// ).await?;
912/// # Ok(())
913/// # }
914/// ```
915pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
916    tracing::info!("Starting MongoDB to PostgreSQL migration...");
917
918    // Step 1: Validate and connect to MongoDB
919    tracing::info!("Step 1/5: Validating MongoDB connection...");
920    let client = crate::mongodb::connect_mongodb(mongo_url)
921        .await
922        .context("MongoDB connection failed")?;
923    tracing::info!("  ✓ MongoDB connection validated");
924
925    // Step 2: Extract database name
926    tracing::info!("Step 2/5: Extracting database name...");
927    let db_name = crate::mongodb::extract_database_name(mongo_url)
928        .await
929        .context("Failed to parse MongoDB connection string")?
930        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
931    tracing::info!("  ✓ Database name: '{}'", db_name);
932
933    // Step 3: List all collections
934    tracing::info!("Step 3/5: Discovering collections...");
935    let db = client.database(&db_name);
936    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
937        .await
938        .context("Failed to list collections from MongoDB database")?;
939
940    if collections.is_empty() {
941        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
942        tracing::info!("✅ Migration complete (no collections to migrate)");
943        return Ok(());
944    }
945
946    tracing::info!("Found {} collection(s) to migrate", collections.len());
947
948    // Step 4: Connect to PostgreSQL target
949    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
950    let target_client = postgres::connect_with_retry(target_url).await?;
951    tracing::info!("  ✓ Connected to PostgreSQL target");
952
953    // Step 5: Migrate each collection
954    tracing::info!("Step 5/5: Migrating collections...");
955    for (idx, collection_name) in collections.iter().enumerate() {
956        tracing::info!(
957            "Migrating collection {}/{}: '{}'",
958            idx + 1,
959            collections.len(),
960            collection_name
961        );
962
963        // Convert MongoDB collection to JSONB
964        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
965            .await
966            .with_context(|| {
967                format!(
968                    "Failed to convert collection '{}' to JSONB",
969                    collection_name
970                )
971            })?;
972
973        tracing::info!(
974            "  ✓ Converted {} documents from '{}'",
975            rows.len(),
976            collection_name
977        );
978
979        // Create JSONB table in PostgreSQL
980        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
981            .await
982            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
983
984        tracing::info!(
985            "  ✓ Created JSONB table '{}' in PostgreSQL",
986            collection_name
987        );
988
989        if !rows.is_empty() {
990            // Batch insert all rows
991            crate::jsonb::writer::insert_jsonb_batch(
992                &target_client,
993                collection_name,
994                rows,
995                "mongodb",
996            )
997            .await
998            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
999
1000            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
1001        } else {
1002            tracing::info!(
1003                "  ✓ Collection '{}' is empty (no documents to insert)",
1004                collection_name
1005            );
1006        }
1007    }
1008
1009    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1010    tracing::info!(
1011        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1012        collections.len(),
1013        db_name
1014    );
1015
1016    Ok(())
1017}
1018
1019/// Initial replication from MySQL to PostgreSQL
1020///
1021/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1022/// 1. Validates MySQL connection string
1023/// 2. Connects to MySQL and verifies connection
1024/// 3. Extracts database name from connection string
1025/// 4. Lists all tables from MySQL database
1026/// 5. For each table:
1027///    - Converts rows to JSONB format
1028///    - Creates JSONB table in PostgreSQL
1029///    - Batch inserts all data
1030///
1031/// All MySQL data is stored as JSONB with metadata:
1032/// - id: Primary key or auto-generated ID
1033/// - data: Complete row as JSON object
1034/// - _source_type: "mysql"
1035/// - _migrated_at: Timestamp of replication
1036///
1037/// # Arguments
1038///
1039/// * `mysql_url` - MySQL connection string (mysql://...)
1040/// * `target_url` - PostgreSQL connection string for target (Seren) database
1041///
1042/// # Returns
1043///
1044/// Returns `Ok(())` if replication completes successfully.
1045///
1046/// # Errors
1047///
1048/// This function will return an error if:
1049/// - MySQL connection string is invalid
1050/// - Cannot connect to MySQL database
1051/// - Database name is not specified in connection string
1052/// - Cannot connect to target PostgreSQL database
1053/// - Table conversion fails
1054/// - Database creation or insert operations fail
1055///
1056/// # Examples
1057///
1058/// ```no_run
1059/// # use anyhow::Result;
1060/// # use database_replicator::commands::init::init_mysql_to_postgres;
1061/// # async fn example() -> Result<()> {
1062/// init_mysql_to_postgres(
1063///     "mysql://user:pass@localhost:3306/mydb",
1064///     "postgresql://user:pass@seren.example.com/targetdb"
1065/// ).await?;
1066/// # Ok(())
1067/// # }
1068/// ```
1069pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1070    tracing::info!("Starting MySQL to PostgreSQL replication...");
1071
1072    // Step 1: Validate and connect to MySQL
1073    tracing::info!("Step 1/5: Validating MySQL connection...");
1074    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1075        .await
1076        .context("MySQL connection failed")?;
1077    tracing::info!("  ✓ MySQL connection validated");
1078
1079    // Step 2: Extract database name
1080    tracing::info!("Step 2/5: Extracting database name...");
1081    let db_name = crate::mysql::extract_database_name(mysql_url)
1082        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1083    tracing::info!("  ✓ Database name: '{}'", db_name);
1084
1085    // Step 3: List all tables
1086    tracing::info!("Step 3/5: Discovering tables...");
1087    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1088        .await
1089        .context("Failed to list tables from MySQL database")?;
1090
1091    if tables.is_empty() {
1092        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1093        tracing::info!("✅ Replication complete (no tables to replicate)");
1094        return Ok(());
1095    }
1096
1097    tracing::info!("Found {} table(s) to replicate", tables.len());
1098
1099    // Step 4: Connect to PostgreSQL target
1100    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1101    let target_client = postgres::connect_with_retry(target_url).await?;
1102    tracing::info!("  ✓ Connected to PostgreSQL target");
1103
1104    // Step 5: Replicate each table
1105    tracing::info!("Step 5/5: Replicating tables...");
1106    for (idx, table_name) in tables.iter().enumerate() {
1107        tracing::info!(
1108            "Replicating table {}/{}: '{}'",
1109            idx + 1,
1110            tables.len(),
1111            table_name
1112        );
1113
1114        // Convert MySQL table to JSONB
1115        let rows =
1116            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1117                .await
1118                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1119
1120        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1121
1122        // Create JSONB table in PostgreSQL
1123        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1124            .await
1125            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1126
1127        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1128
1129        if !rows.is_empty() {
1130            // Batch insert all rows
1131            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1132                .await
1133                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1134
1135            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1136        } else {
1137            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1138        }
1139    }
1140
1141    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1142    tracing::info!(
1143        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1144        tables.len(),
1145        db_name
1146    );
1147
1148    Ok(())
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use super::*;
1154
1155    #[tokio::test]
1156    #[ignore]
1157    async fn test_init_replicates_database() {
1158        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1159        let target = std::env::var("TEST_TARGET_URL").unwrap();
1160
1161        // Skip confirmation for automated tests, disable sync to keep test simple
1162        let filter = crate::filters::ReplicationFilter::empty();
1163        let result = init(&source, &target, true, filter, false, false, true, false).await;
1164        assert!(result.is_ok());
1165    }
1166
1167    #[test]
1168    fn test_replace_database_in_url() {
1169        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1170        let result = replace_database_in_url(url, "newdb").unwrap();
1171        assert_eq!(
1172            result,
1173            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1174        );
1175
1176        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1177        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1178        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1179    }
1180
1181    #[tokio::test]
1182    #[ignore]
1183    async fn test_database_is_empty() {
1184        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1185
1186        // postgres database might be empty of user tables
1187        // This test just verifies the function doesn't crash
1188        let result = database_is_empty(&url, "postgres").await;
1189        assert!(result.is_ok());
1190    }
1191}