database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9/// Initial replication command for snapshot schema and data copy
10///
11/// Performs a full database dump and restore from source to target in steps:
12/// 1. Estimates database sizes and replication times
13/// 2. Prompts for confirmation (unless skip_confirmation is true)
14/// 3. Dumps global objects (roles, tablespaces) from source
15/// 4. Restores global objects to target
16/// 5. Discovers all user databases on source
17/// 6. Replicates each database (schema and data)
18/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
19///
20/// Uses temporary directory for dump files, which is automatically cleaned up.
21///
22/// # Arguments
23///
24/// * `source_url` - PostgreSQL connection string for source (Neon) database
25/// * `target_url` - PostgreSQL connection string for target (Seren) database
26/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
27/// * `filter` - Database and table filtering rules
28/// * `drop_existing` - Drop existing databases on target before copying
29/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
30/// * `allow_resume` - Resume from checkpoint if available (default: true)
31/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
32///
33/// # Returns
34///
35/// Returns `Ok(())` if replication completes successfully.
36///
37/// # Errors
38///
39/// This function will return an error if:
40/// - Cannot create temporary directory
41/// - Global objects dump/restore fails
42/// - Cannot connect to source database
43/// - Database discovery fails
44/// - Any database replication fails
45/// - User declines confirmation prompt
46/// - Logical replication setup fails (if enable_sync is true)
47///
48/// # Examples
49///
50/// ```no_run
51/// # use anyhow::Result;
52/// # use database_replicator::commands::init;
53/// # use database_replicator::filters::ReplicationFilter;
54/// # async fn example() -> Result<()> {
55/// // With confirmation prompt and automatic sync (default)
56/// init(
57///     "postgresql://user:pass@neon.tech/sourcedb",
58///     "postgresql://user:pass@seren.example.com/targetdb",
59///     false,
60///     ReplicationFilter::empty(),
61///     false,
62///     true,   // Enable continuous replication
63///     true,   // Allow resume
64///     false,  // Not forcing local execution
65/// ).await?;
66///
67/// // Snapshot only (no continuous replication)
68/// init(
69///     "postgresql://user:pass@neon.tech/sourcedb",
70///     "postgresql://user:pass@seren.example.com/targetdb",
71///     true,
72///     ReplicationFilter::empty(),
73///     false,
74///     false,  // Disable continuous replication
75///     true,   // Allow resume
76///     true,   // Force local execution (--local flag)
77/// ).await?;
78/// # Ok(())
79/// # }
80/// ```
81#[allow(clippy::too_many_arguments)]
82pub async fn init(
83    source_url: &str,
84    target_url: &str,
85    skip_confirmation: bool,
86    filter: crate::filters::ReplicationFilter,
87    drop_existing: bool,
88    enable_sync: bool,
89    allow_resume: bool,
90    force_local: bool,
91) -> Result<()> {
92    tracing::info!("Starting initial replication...");
93
94    // Detect source database type and route to appropriate implementation
95    let source_type =
96        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
97
98    match source_type {
99        crate::SourceType::PostgreSQL => {
100            // PostgreSQL to PostgreSQL replication (existing logic below)
101            tracing::info!("Source type: PostgreSQL");
102
103            // Run pre-flight checks before any destructive operations
104            tracing::info!("Running pre-flight checks...");
105
106            let databases = filter.include_databases().map(|v| v.to_vec());
107            let preflight_result = crate::preflight::run_preflight_checks(
108                source_url,
109                target_url,
110                databases.as_deref(),
111            )
112            .await?;
113
114            preflight_result.print();
115
116            if !preflight_result.all_passed() {
117                // Check if we can auto-fallback to remote
118                if preflight_result.tool_version_incompatible
119                    && crate::utils::is_serendb_target(target_url)
120                    && !force_local
121                {
122                    println!();
123                    tracing::info!(
124                        "Tool version incompatible. Switching to SerenAI cloud execution..."
125                    );
126                    // Return special error that main.rs catches to trigger remote
127                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
128                }
129
130                // Cannot auto-fallback
131                if force_local {
132                    bail!(
133                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
134                         Fix the issues above or remove --local to allow remote execution."
135                    );
136                }
137
138                bail!("Pre-flight checks failed. Fix the issues above and retry.");
139            }
140
141            println!();
142        }
143        crate::SourceType::SQLite => {
144            // SQLite to PostgreSQL migration (simpler path)
145            tracing::info!("Source type: SQLite");
146
147            // SQLite migrations don't support PostgreSQL-specific features
148            if !filter.is_empty() {
149                tracing::warn!(
150                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
151                );
152            }
153            if drop_existing {
154                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
155            }
156            if !enable_sync {
157                tracing::warn!(
158                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
159                );
160            }
161
162            return init_sqlite_to_postgres(source_url, target_url).await;
163        }
164        crate::SourceType::MongoDB => {
165            // MongoDB to PostgreSQL migration (simpler path)
166            tracing::info!("Source type: MongoDB");
167
168            // MongoDB migrations don't support PostgreSQL-specific features
169            if !filter.is_empty() {
170                tracing::warn!(
171                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
172                );
173            }
174            if drop_existing {
175                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
176            }
177            if !enable_sync {
178                tracing::warn!(
179                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
180                );
181            }
182
183            return init_mongodb_to_postgres(source_url, target_url).await;
184        }
185        crate::SourceType::MySQL => {
186            // MySQL to PostgreSQL replication (simpler path)
187            tracing::info!("Source type: MySQL");
188
189            // MySQL replications don't support PostgreSQL-specific features
190            if !filter.is_empty() {
191                tracing::warn!(
192                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
193                );
194            }
195            if drop_existing {
196                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
197            }
198            if !enable_sync {
199                tracing::warn!(
200                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
201                );
202            }
203
204            return init_mysql_to_postgres(source_url, target_url).await;
205        }
206    }
207
208    // CRITICAL: Ensure source and target are different to prevent data loss
209    crate::utils::validate_source_target_different(source_url, target_url)
210        .context("Source and target validation failed")?;
211    tracing::info!("✓ Verified source and target are different databases");
212
213    // Create managed temporary directory for dump files
214    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
215    let temp_path =
216        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
217    tracing::debug!("Using temp directory: {}", temp_path.display());
218
219    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
220        .context("Failed to determine checkpoint location")?;
221
222    // Step 1: Dump global objects
223    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
224    let globals_file = temp_path.join("globals.sql");
225    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
226    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
227        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
228    migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
229        .context("Failed to remove SUPERUSER from globals dump")?;
230    migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
231        .context("Failed to remove restricted parameter settings from globals dump")?;
232
233    // Step 2: Restore global objects
234    tracing::info!("Step 2/4: Restoring global objects to target...");
235    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
236
237    // Step 3: Discover and filter databases
238    tracing::info!("Step 3/4: Discovering databases...");
239    let all_databases = {
240        // Scope the connection so it's dropped before subprocess operations
241        let source_client = postgres::connect_with_retry(source_url).await?;
242        migration::list_databases(&source_client).await?
243    }; // Connection dropped here
244
245    // Apply filtering rules
246    let databases: Vec<_> = all_databases
247        .into_iter()
248        .filter(|db| filter.should_replicate_database(&db.name))
249        .collect();
250
251    if databases.is_empty() {
252        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
253        if filter.is_empty() {
254            tracing::warn!("⚠ No user databases found on source");
255            tracing::warn!("  This is unusual - the source database appears empty");
256            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
257        } else {
258            tracing::warn!("⚠ No databases matched the filter criteria");
259            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
260        }
261        tracing::info!("✅ Initial replication complete (no databases to replicate)");
262        return Ok(());
263    }
264
265    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
266    let filter_hash = filter.fingerprint();
267    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
268        source_url,
269        target_url,
270        filter_hash,
271        drop_existing,
272        enable_sync,
273    );
274
275    let mut checkpoint_state = if allow_resume {
276        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
277            Some(existing) => {
278                // Try to validate the checkpoint
279                match existing.validate(&checkpoint_metadata, &database_names) {
280                    Ok(()) => {
281                        // Validation succeeded - resume from checkpoint
282                        if existing.completed_count() > 0 {
283                            tracing::info!(
284                                "Resume checkpoint found: {}/{} databases already replicated",
285                                existing.completed_count(),
286                                existing.total_databases()
287                            );
288                        } else {
289                            tracing::info!(
290                                "Resume checkpoint found but no databases marked complete yet"
291                            );
292                        }
293                        existing
294                    }
295                    Err(e) => {
296                        // Validation failed - log warning and start fresh
297                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
298                        tracing::warn!(
299                            "  Previous run configuration differs from current configuration"
300                        );
301                        tracing::warn!("  - Schema-only tables may have changed");
302                        tracing::warn!("  - Time filters may have changed");
303                        tracing::warn!("  - Table selection may have changed");
304                        tracing::warn!("  Error: {}", e);
305                        tracing::info!("");
306                        tracing::info!(
307                            "✓ Automatically discarding old checkpoint and starting fresh"
308                        );
309                        checkpoint::remove_checkpoint(&checkpoint_path)?;
310                        checkpoint::InitCheckpoint::new(
311                            checkpoint_metadata.clone(),
312                            &database_names,
313                        )
314                    }
315                }
316            }
317            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
318        }
319    } else {
320        if checkpoint_path.exists() {
321            tracing::info!(
322                "--no-resume supplied: discarding previous checkpoint at {}",
323                checkpoint_path.display()
324            );
325        }
326        checkpoint::remove_checkpoint(&checkpoint_path)?;
327        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
328    };
329
330    // Persist baseline state so crashes before first database can resume cleanly
331    checkpoint_state
332        .save(&checkpoint_path)
333        .context("Failed to persist checkpoint state")?;
334
335    tracing::info!("Found {} database(s) to replicate", databases.len());
336
337    // Estimate database sizes and get confirmation
338    if !skip_confirmation {
339        tracing::info!("Analyzing database sizes...");
340        let size_estimates = {
341            // Scope the connection so it's dropped after size estimation
342            let source_client = postgres::connect_with_retry(source_url).await?;
343            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
344                .await?
345        }; // Connection dropped here
346
347        if !confirm_replication(&size_estimates)? {
348            bail!("Replication cancelled by user");
349        }
350    }
351
352    // Step 4: Replicate each database
353    tracing::info!("Step 4/4: Replicating databases...");
354    for (idx, db_info) in databases.iter().enumerate() {
355        let filtered_tables = filter.predicate_tables(&db_info.name);
356        if checkpoint_state.is_completed(&db_info.name) {
357            tracing::info!(
358                "Skipping database '{}' (already completed per checkpoint)",
359                db_info.name
360            );
361            continue;
362        }
363        tracing::info!(
364            "Replicating database {}/{}: '{}'",
365            idx + 1,
366            databases.len(),
367            db_info.name
368        );
369
370        // Build connection URLs for this specific database
371        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
372        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
373
374        // Handle database creation atomically to avoid TOCTOU race condition
375        // Scope the connection so it's dropped before dump/restore subprocess operations
376        {
377            let target_client = postgres::connect_with_retry(target_url).await?;
378
379            // Validate database name to prevent SQL injection
380            crate::utils::validate_postgres_identifier(&db_info.name)
381                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
382
383            // Try to create database atomically (avoids TOCTOU vulnerability)
384            let create_query = format!(
385                "CREATE DATABASE {}",
386                crate::utils::quote_ident(&db_info.name)
387            );
388            match target_client.execute(&create_query, &[]).await {
389                Ok(_) => {
390                    tracing::info!("  Created database '{}'", db_info.name);
391                }
392                Err(err) => {
393                    // Check if error is "database already exists" (error code 42P04)
394                    if let Some(db_error) = err.as_db_error() {
395                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
396                            // Database already exists - handle based on user preferences
397                            tracing::info!(
398                                "  Database '{}' already exists on target",
399                                db_info.name
400                            );
401
402                            // Check if empty
403                            if database_is_empty(target_url, &db_info.name).await? {
404                                tracing::info!(
405                                    "  Database '{}' is empty, proceeding with restore",
406                                    db_info.name
407                                );
408                            } else {
409                                // Database exists and has data
410                                let should_drop = if drop_existing {
411                                    // Force drop with --drop-existing flag
412                                    true
413                                } else if skip_confirmation {
414                                    // Auto-confirm drop with --yes flag (non-interactive)
415                                    tracing::info!(
416                                        "  Auto-confirming drop for database '{}' (--yes flag)",
417                                        db_info.name
418                                    );
419                                    true
420                                } else {
421                                    // Interactive mode: prompt user
422                                    prompt_drop_database(&db_info.name)?
423                                };
424
425                                if should_drop {
426                                    drop_database_if_exists(&target_client, &db_info.name).await?;
427
428                                    // Recreate the database
429                                    let create_query = format!(
430                                        "CREATE DATABASE {}",
431                                        crate::utils::quote_ident(&db_info.name)
432                                    );
433                                    target_client
434                                        .execute(&create_query, &[])
435                                        .await
436                                        .with_context(|| {
437                                            format!(
438                                                "Failed to create database '{}' after drop",
439                                                db_info.name
440                                            )
441                                        })?;
442                                    tracing::info!("  Created database '{}'", db_info.name);
443                                } else {
444                                    bail!("Aborted: Database '{}' already exists", db_info.name);
445                                }
446                            }
447                        } else {
448                            // Some other database error - propagate it
449                            return Err(err).with_context(|| {
450                                format!("Failed to create database '{}'", db_info.name)
451                            });
452                        }
453                    } else {
454                        // Not a database error - propagate it
455                        return Err(err).with_context(|| {
456                            format!("Failed to create database '{}'", db_info.name)
457                        });
458                    }
459                }
460            }
461        } // Connection dropped here before dump/restore operations
462
463        // Dump and restore schema
464        tracing::info!("  Dumping schema for '{}'...", db_info.name);
465        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
466        migration::dump_schema(
467            &source_db_url,
468            &db_info.name,
469            schema_file.to_str().unwrap(),
470            &filter,
471        )
472        .await?;
473
474        tracing::info!("  Restoring schema for '{}'...", db_info.name);
475        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
476
477        // Dump and restore data (using directory format for parallel operations)
478        tracing::info!("  Dumping data for '{}'...", db_info.name);
479        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
480        migration::dump_data(
481            &source_db_url,
482            &db_info.name,
483            data_dir.to_str().unwrap(),
484            &filter,
485        )
486        .await?;
487
488        tracing::info!("  Restoring data for '{}'...", db_info.name);
489        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
490
491        if !filtered_tables.is_empty() {
492            tracing::info!(
493                "  Applying filtered replication for {} table(s)...",
494                filtered_tables.len()
495            );
496            migration::filtered::copy_filtered_tables(
497                &source_db_url,
498                &target_db_url,
499                &filtered_tables,
500            )
501            .await?;
502        }
503
504        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
505
506        checkpoint_state.mark_completed(&db_info.name);
507        checkpoint_state
508            .save(&checkpoint_path)
509            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
510    }
511
512    // Explicitly clean up temp directory
513    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
514    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
515        tracing::warn!("Failed to clean up temp directory: {}", e);
516        // Don't fail the entire operation if cleanup fails
517    }
518
519    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
520        tracing::warn!("Failed to remove checkpoint state: {}", err);
521    }
522
523    tracing::info!("✅ Initial replication complete");
524
525    // Check wal_level before attempting to set up sync
526    let mut should_enable_sync = enable_sync;
527    if enable_sync {
528        tracing::info!("Checking target wal_level for logical replication...");
529        let target_wal_level = {
530            // Scope the connection for quick wal_level check
531            let target_client = postgres::connect_with_retry(target_url).await?;
532            postgres::check_wal_level(&target_client).await?
533        }; // Connection dropped here
534
535        if target_wal_level != "logical" {
536            tracing::warn!("");
537            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
538            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
539            tracing::warn!("");
540            tracing::warn!("  To fix this:");
541            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
542            tracing::warn!("    2. Restart PostgreSQL server");
543            tracing::warn!(
544                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
545            );
546            tracing::warn!("");
547            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
548            should_enable_sync = false;
549        }
550    }
551
552    // Set up continuous logical replication if enabled
553    if should_enable_sync {
554        tracing::info!("");
555        tracing::info!("========================================");
556        tracing::info!("Step 5/5: Setting up continuous replication...");
557        tracing::info!("========================================");
558        tracing::info!("");
559
560        // Call sync command with the same filter
561        crate::commands::sync(
562            source_url,
563            target_url,
564            Some(filter),
565            None,
566            None,
567            None,
568            false,
569        )
570        .await
571        .context("Failed to set up continuous replication")?;
572
573        tracing::info!("");
574        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
575    } else {
576        tracing::info!("");
577        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
578        tracing::info!("  To enable it later, run:");
579        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
580    }
581
582    Ok(())
583}
584
585/// Replace the database name in a connection URL
586fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
587    // Parse URL to find database name
588    // Format: postgresql://user:pass@host:port/database?params
589
590    // Split by '?' to separate params
591    let parts: Vec<&str> = url.split('?').collect();
592    let base_url = parts[0];
593    let params = if parts.len() > 1 {
594        Some(parts[1])
595    } else {
596        None
597    };
598
599    // Split base by '/' to get everything before database name
600    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
601    if url_parts.len() != 2 {
602        anyhow::bail!("Invalid connection URL format");
603    }
604
605    // Reconstruct URL with new database name
606    let mut new_url = format!("{}/{}", url_parts[1], new_database);
607    if let Some(p) = params {
608        new_url = format!("{}?{}", new_url, p);
609    }
610
611    Ok(new_url)
612}
613
614/// Display database size estimates and prompt for confirmation
615///
616/// Shows a table with database names, sizes, and estimated replication times.
617/// Prompts the user to proceed with the replication.
618///
619/// # Arguments
620///
621/// * `sizes` - Vector of database size estimates
622///
623/// # Returns
624///
625/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
626///
627/// # Errors
628///
629/// Returns an error if stdin/stdout operations fail.
630fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
631    use std::time::Duration;
632
633    // Calculate totals
634    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
635    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
636
637    // Print table header
638    println!();
639    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
640    println!("{}", "─".repeat(50));
641
642    // Print each database
643    for size in sizes {
644        println!(
645            "{:<20} {:<12} {:<15}",
646            size.name,
647            size.size_human,
648            migration::format_duration(size.estimated_duration)
649        );
650    }
651
652    // Print totals
653    println!("{}", "─".repeat(50));
654    println!(
655        "Total: {} (estimated {})",
656        migration::format_bytes(total_bytes),
657        migration::format_duration(total_duration)
658    );
659    println!();
660
661    // Prompt for confirmation
662    print!("Proceed with replication? [y/N]: ");
663    io::stdout().flush()?;
664
665    let mut input = String::new();
666    io::stdin()
667        .read_line(&mut input)
668        .context("Failed to read user input")?;
669
670    Ok(input.trim().to_lowercase() == "y")
671}
672
673/// Checks if a database is empty (no user tables)
674async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
675    // Need to connect to the specific database to check tables
676    let db_url = replace_database_in_url(target_url, db_name)?;
677    let client = postgres::connect_with_retry(&db_url).await?;
678
679    let query = "
680        SELECT COUNT(*)
681        FROM information_schema.tables
682        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
683    ";
684
685    let row = client.query_one(query, &[]).await?;
686    let count: i64 = row.get(0);
687
688    Ok(count == 0)
689}
690
691/// Prompts user to drop existing database
692fn prompt_drop_database(db_name: &str) -> Result<bool> {
693    use std::io::{self, Write};
694
695    print!(
696        "\nWarning: Database '{}' already exists on target and contains data.\n\
697         Drop and recreate database? This will delete all existing data. [y/N]: ",
698        db_name
699    );
700    io::stdout().flush()?;
701
702    let mut input = String::new();
703    io::stdin().read_line(&mut input)?;
704
705    Ok(input.trim().eq_ignore_ascii_case("y"))
706}
707
708/// Drops a database if it exists
709async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
710    // Validate database name to prevent SQL injection
711    crate::utils::validate_postgres_identifier(db_name)
712        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
713
714    tracing::info!("  Dropping existing database '{}'...", db_name);
715
716    // Terminate existing connections to the database
717    let terminate_query = "
718        SELECT pg_terminate_backend(pid)
719        FROM pg_stat_activity
720        WHERE datname = $1 AND pid <> pg_backend_pid()
721    ";
722    target_conn.execute(terminate_query, &[&db_name]).await?;
723
724    // Drop the database
725    let drop_query = format!(
726        "DROP DATABASE IF EXISTS {}",
727        crate::utils::quote_ident(db_name)
728    );
729    target_conn
730        .execute(&drop_query, &[])
731        .await
732        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
733
734    tracing::info!("  ✓ Database '{}' dropped", db_name);
735    Ok(())
736}
737
738/// Initial replication from SQLite to PostgreSQL
739///
740/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
741/// 1. Validates SQLite file exists and is readable
742/// 2. Validates PostgreSQL target connection
743/// 3. Lists all tables from SQLite database
744/// 4. For each table:
745///    - Converts rows to JSONB format
746///    - Creates JSONB table in PostgreSQL
747///    - Batch inserts all data
748///
749/// All SQLite data is stored as JSONB with metadata:
750/// - id: Original row ID (from ID column or row number)
751/// - data: Complete row as JSON object
752/// - _source_type: "sqlite"
753/// - _migrated_at: Timestamp of migration
754///
755/// # Arguments
756///
757/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
758/// * `target_url` - PostgreSQL connection string for target (Seren) database
759///
760/// # Returns
761///
762/// Returns `Ok(())` if migration completes successfully.
763///
764/// # Errors
765///
766/// This function will return an error if:
767/// - SQLite file doesn't exist or isn't readable
768/// - Cannot connect to target PostgreSQL database
769/// - Table conversion fails
770/// - Database creation or insert operations fail
771///
772/// # Examples
773///
774/// ```no_run
775/// # use anyhow::Result;
776/// # use database_replicator::commands::init::init_sqlite_to_postgres;
777/// # async fn example() -> Result<()> {
778/// init_sqlite_to_postgres(
779///     "database.db",
780///     "postgresql://user:pass@seren.example.com/targetdb"
781/// ).await?;
782/// # Ok(())
783/// # }
784/// ```
785pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
786    tracing::info!("Starting SQLite to PostgreSQL migration...");
787
788    // Step 1: Validate SQLite file
789    tracing::info!("Step 1/4: Validating SQLite database...");
790    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
791        .context("SQLite file validation failed")?;
792    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
793
794    // Step 2: Open SQLite connection (read-only)
795    tracing::info!("Step 2/4: Opening SQLite database...");
796    let sqlite_conn =
797        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
798    tracing::info!("  ✓ SQLite database opened (read-only mode)");
799
800    // Step 3: List all tables
801    tracing::info!("Step 3/4: Discovering tables...");
802    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
803        .context("Failed to list tables from SQLite database")?;
804
805    if tables.is_empty() {
806        tracing::warn!("⚠ No tables found in SQLite database");
807        tracing::info!("✅ Migration complete (no tables to migrate)");
808        return Ok(());
809    }
810
811    tracing::info!("Found {} table(s) to migrate", tables.len());
812
813    // Connect to PostgreSQL target
814    let target_client = postgres::connect_with_retry(target_url).await?;
815    tracing::info!("  ✓ Connected to PostgreSQL target");
816
817    // Step 4: Migrate each table
818    tracing::info!("Step 4/4: Migrating tables...");
819    for (idx, table_name) in tables.iter().enumerate() {
820        tracing::info!(
821            "Migrating table {}/{}: '{}'",
822            idx + 1,
823            tables.len(),
824            table_name
825        );
826
827        // Convert SQLite table to JSONB
828        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
829            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
830
831        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
832
833        // Create JSONB table in PostgreSQL
834        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
835            .await
836            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
837
838        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
839
840        if !rows.is_empty() {
841            // Batch insert all rows
842            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
843                .await
844                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
845
846            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
847        } else {
848            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
849        }
850    }
851
852    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
853    tracing::info!(
854        "   Migrated {} table(s) from '{}' to PostgreSQL",
855        tables.len(),
856        sqlite_path
857    );
858
859    Ok(())
860}
861
862/// Initial replication from MongoDB to PostgreSQL
863///
864/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
865/// 1. Validates MongoDB connection string
866/// 2. Connects to MongoDB and verifies connection
867/// 3. Extracts database name from connection string
868/// 4. Lists all collections from MongoDB database
869/// 5. For each collection:
870///    - Converts documents to JSONB format
871///    - Creates JSONB table in PostgreSQL
872///    - Batch inserts all data
873///
874/// All MongoDB data is stored as JSONB with metadata:
875/// - id: Original _id field (ObjectId → hex, String/Int → string)
876/// - data: Complete document as JSON object
877/// - _source_type: "mongodb"
878/// - _migrated_at: Timestamp of migration
879///
880/// # Arguments
881///
882/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
883/// * `target_url` - PostgreSQL connection string for target (Seren) database
884///
885/// # Returns
886///
887/// Returns `Ok(())` if migration completes successfully.
888///
889/// # Errors
890///
891/// This function will return an error if:
892/// - MongoDB connection string is invalid
893/// - Cannot connect to MongoDB database
894/// - Database name is not specified in connection string
895/// - Cannot connect to target PostgreSQL database
896/// - Collection conversion fails
897/// - Database creation or insert operations fail
898///
899/// # Examples
900///
901/// ```no_run
902/// # use anyhow::Result;
903/// # use database_replicator::commands::init::init_mongodb_to_postgres;
904/// # async fn example() -> Result<()> {
905/// init_mongodb_to_postgres(
906///     "mongodb://localhost:27017/mydb",
907///     "postgresql://user:pass@seren.example.com/targetdb"
908/// ).await?;
909/// # Ok(())
910/// # }
911/// ```
912pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
913    tracing::info!("Starting MongoDB to PostgreSQL migration...");
914
915    // Step 1: Validate and connect to MongoDB
916    tracing::info!("Step 1/5: Validating MongoDB connection...");
917    let client = crate::mongodb::connect_mongodb(mongo_url)
918        .await
919        .context("MongoDB connection failed")?;
920    tracing::info!("  ✓ MongoDB connection validated");
921
922    // Step 2: Extract database name
923    tracing::info!("Step 2/5: Extracting database name...");
924    let db_name = crate::mongodb::extract_database_name(mongo_url)
925        .await
926        .context("Failed to parse MongoDB connection string")?
927        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
928    tracing::info!("  ✓ Database name: '{}'", db_name);
929
930    // Step 3: List all collections
931    tracing::info!("Step 3/5: Discovering collections...");
932    let db = client.database(&db_name);
933    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
934        .await
935        .context("Failed to list collections from MongoDB database")?;
936
937    if collections.is_empty() {
938        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
939        tracing::info!("✅ Migration complete (no collections to migrate)");
940        return Ok(());
941    }
942
943    tracing::info!("Found {} collection(s) to migrate", collections.len());
944
945    // Step 4: Connect to PostgreSQL target
946    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
947    let target_client = postgres::connect_with_retry(target_url).await?;
948    tracing::info!("  ✓ Connected to PostgreSQL target");
949
950    // Step 5: Migrate each collection
951    tracing::info!("Step 5/5: Migrating collections...");
952    for (idx, collection_name) in collections.iter().enumerate() {
953        tracing::info!(
954            "Migrating collection {}/{}: '{}'",
955            idx + 1,
956            collections.len(),
957            collection_name
958        );
959
960        // Convert MongoDB collection to JSONB
961        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
962            .await
963            .with_context(|| {
964                format!(
965                    "Failed to convert collection '{}' to JSONB",
966                    collection_name
967                )
968            })?;
969
970        tracing::info!(
971            "  ✓ Converted {} documents from '{}'",
972            rows.len(),
973            collection_name
974        );
975
976        // Create JSONB table in PostgreSQL
977        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
978            .await
979            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
980
981        tracing::info!(
982            "  ✓ Created JSONB table '{}' in PostgreSQL",
983            collection_name
984        );
985
986        if !rows.is_empty() {
987            // Batch insert all rows
988            crate::jsonb::writer::insert_jsonb_batch(
989                &target_client,
990                collection_name,
991                rows,
992                "mongodb",
993            )
994            .await
995            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
996
997            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
998        } else {
999            tracing::info!(
1000                "  ✓ Collection '{}' is empty (no documents to insert)",
1001                collection_name
1002            );
1003        }
1004    }
1005
1006    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1007    tracing::info!(
1008        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1009        collections.len(),
1010        db_name
1011    );
1012
1013    Ok(())
1014}
1015
1016/// Initial replication from MySQL to PostgreSQL
1017///
1018/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1019/// 1. Validates MySQL connection string
1020/// 2. Connects to MySQL and verifies connection
1021/// 3. Extracts database name from connection string
1022/// 4. Lists all tables from MySQL database
1023/// 5. For each table:
1024///    - Converts rows to JSONB format
1025///    - Creates JSONB table in PostgreSQL
1026///    - Batch inserts all data
1027///
1028/// All MySQL data is stored as JSONB with metadata:
1029/// - id: Primary key or auto-generated ID
1030/// - data: Complete row as JSON object
1031/// - _source_type: "mysql"
1032/// - _migrated_at: Timestamp of replication
1033///
1034/// # Arguments
1035///
1036/// * `mysql_url` - MySQL connection string (mysql://...)
1037/// * `target_url` - PostgreSQL connection string for target (Seren) database
1038///
1039/// # Returns
1040///
1041/// Returns `Ok(())` if replication completes successfully.
1042///
1043/// # Errors
1044///
1045/// This function will return an error if:
1046/// - MySQL connection string is invalid
1047/// - Cannot connect to MySQL database
1048/// - Database name is not specified in connection string
1049/// - Cannot connect to target PostgreSQL database
1050/// - Table conversion fails
1051/// - Database creation or insert operations fail
1052///
1053/// # Examples
1054///
1055/// ```no_run
1056/// # use anyhow::Result;
1057/// # use database_replicator::commands::init::init_mysql_to_postgres;
1058/// # async fn example() -> Result<()> {
1059/// init_mysql_to_postgres(
1060///     "mysql://user:pass@localhost:3306/mydb",
1061///     "postgresql://user:pass@seren.example.com/targetdb"
1062/// ).await?;
1063/// # Ok(())
1064/// # }
1065/// ```
1066pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1067    tracing::info!("Starting MySQL to PostgreSQL replication...");
1068
1069    // Step 1: Validate and connect to MySQL
1070    tracing::info!("Step 1/5: Validating MySQL connection...");
1071    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1072        .await
1073        .context("MySQL connection failed")?;
1074    tracing::info!("  ✓ MySQL connection validated");
1075
1076    // Step 2: Extract database name
1077    tracing::info!("Step 2/5: Extracting database name...");
1078    let db_name = crate::mysql::extract_database_name(mysql_url)
1079        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1080    tracing::info!("  ✓ Database name: '{}'", db_name);
1081
1082    // Step 3: List all tables
1083    tracing::info!("Step 3/5: Discovering tables...");
1084    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1085        .await
1086        .context("Failed to list tables from MySQL database")?;
1087
1088    if tables.is_empty() {
1089        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1090        tracing::info!("✅ Replication complete (no tables to replicate)");
1091        return Ok(());
1092    }
1093
1094    tracing::info!("Found {} table(s) to replicate", tables.len());
1095
1096    // Step 4: Connect to PostgreSQL target
1097    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1098    let target_client = postgres::connect_with_retry(target_url).await?;
1099    tracing::info!("  ✓ Connected to PostgreSQL target");
1100
1101    // Step 5: Replicate each table
1102    tracing::info!("Step 5/5: Replicating tables...");
1103    for (idx, table_name) in tables.iter().enumerate() {
1104        tracing::info!(
1105            "Replicating table {}/{}: '{}'",
1106            idx + 1,
1107            tables.len(),
1108            table_name
1109        );
1110
1111        // Convert MySQL table to JSONB
1112        let rows =
1113            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1114                .await
1115                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1116
1117        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1118
1119        // Create JSONB table in PostgreSQL
1120        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1121            .await
1122            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1123
1124        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1125
1126        if !rows.is_empty() {
1127            // Batch insert all rows
1128            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1129                .await
1130                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1131
1132            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1133        } else {
1134            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1135        }
1136    }
1137
1138    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1139    tracing::info!(
1140        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1141        tables.len(),
1142        db_name
1143    );
1144
1145    Ok(())
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use super::*;
1151
1152    #[tokio::test]
1153    #[ignore]
1154    async fn test_init_replicates_database() {
1155        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1156        let target = std::env::var("TEST_TARGET_URL").unwrap();
1157
1158        // Skip confirmation for automated tests, disable sync to keep test simple
1159        let filter = crate::filters::ReplicationFilter::empty();
1160        let result = init(&source, &target, true, filter, false, false, true, false).await;
1161        assert!(result.is_ok());
1162    }
1163
1164    #[test]
1165    fn test_replace_database_in_url() {
1166        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1167        let result = replace_database_in_url(url, "newdb").unwrap();
1168        assert_eq!(
1169            result,
1170            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1171        );
1172
1173        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1174        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1175        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1176    }
1177
1178    #[tokio::test]
1179    #[ignore]
1180    async fn test_database_is_empty() {
1181        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1182
1183        // postgres database might be empty of user tables
1184        // This test just verifies the function doesn't crash
1185        let result = database_is_empty(&url, "postgres").await;
1186        assert!(result.is_ok());
1187    }
1188}