database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9/// Initial replication command for snapshot schema and data copy
10///
11/// Performs a full database dump and restore from source to target in steps:
12/// 1. Estimates database sizes and replication times
13/// 2. Prompts for confirmation (unless skip_confirmation is true)
14/// 3. Dumps global objects (roles, tablespaces) from source
15/// 4. Restores global objects to target
16/// 5. Discovers all user databases on source
17/// 6. Replicates each database (schema and data)
18/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
19///
20/// Uses temporary directory for dump files, which is automatically cleaned up.
21///
22/// # Arguments
23///
24/// * `source_url` - PostgreSQL connection string for source (Neon) database
25/// * `target_url` - PostgreSQL connection string for target (Seren) database
26/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
27/// * `filter` - Database and table filtering rules
28/// * `drop_existing` - Drop existing databases on target before copying
29/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
30/// * `allow_resume` - Resume from checkpoint if available (default: true)
31/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
32///
33/// # Returns
34///
35/// Returns `Ok(())` if replication completes successfully.
36///
37/// # Errors
38///
39/// This function will return an error if:
40/// - Cannot create temporary directory
41/// - Global objects dump/restore fails
42/// - Cannot connect to source database
43/// - Database discovery fails
44/// - Any database replication fails
45/// - User declines confirmation prompt
46/// - Logical replication setup fails (if enable_sync is true)
47///
48/// # Examples
49///
50/// ```no_run
51/// # use anyhow::Result;
52/// # use database_replicator::commands::init;
53/// # use database_replicator::filters::ReplicationFilter;
54/// # async fn example() -> Result<()> {
55/// // With confirmation prompt and automatic sync (default)
56/// init(
57///     "postgresql://user:pass@neon.tech/sourcedb",
58///     "postgresql://user:pass@seren.example.com/targetdb",
59///     false,
60///     ReplicationFilter::empty(),
61///     false,
62///     true,   // Enable continuous replication
63///     true,   // Allow resume
64///     false,  // Not forcing local execution
65/// ).await?;
66///
67/// // Snapshot only (no continuous replication)
68/// init(
69///     "postgresql://user:pass@neon.tech/sourcedb",
70///     "postgresql://user:pass@seren.example.com/targetdb",
71///     true,
72///     ReplicationFilter::empty(),
73///     false,
74///     false,  // Disable continuous replication
75///     true,   // Allow resume
76///     true,   // Force local execution (--local flag)
77/// ).await?;
78/// # Ok(())
79/// # }
80/// ```
81#[allow(clippy::too_many_arguments)]
82pub async fn init(
83    source_url: &str,
84    target_url: &str,
85    skip_confirmation: bool,
86    filter: crate::filters::ReplicationFilter,
87    drop_existing: bool,
88    enable_sync: bool,
89    allow_resume: bool,
90    force_local: bool,
91) -> Result<()> {
92    tracing::info!("Starting initial replication...");
93
94    // Detect source database type and route to appropriate implementation
95    let source_type =
96        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
97
98    match source_type {
99        crate::SourceType::PostgreSQL => {
100            // PostgreSQL to PostgreSQL replication (existing logic below)
101            tracing::info!("Source type: PostgreSQL");
102
103            // Run pre-flight checks before any destructive operations
104            tracing::info!("Running pre-flight checks...");
105
106            let databases = filter.include_databases().map(|v| v.to_vec());
107            let preflight_result = crate::preflight::run_preflight_checks(
108                source_url,
109                target_url,
110                databases.as_deref(),
111            )
112            .await?;
113
114            preflight_result.print();
115
116            if !preflight_result.all_passed() {
117                // Check if we can auto-fallback to remote
118                if preflight_result.tool_version_incompatible
119                    && crate::utils::is_serendb_target(target_url)
120                    && !force_local
121                {
122                    println!();
123                    tracing::info!(
124                        "Tool version incompatible. Switching to SerenAI cloud execution..."
125                    );
126                    // Return special error that main.rs catches to trigger remote
127                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
128                }
129
130                // Cannot auto-fallback
131                if force_local {
132                    bail!(
133                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
134                         Fix the issues above or remove --local to allow remote execution."
135                    );
136                }
137
138                bail!("Pre-flight checks failed. Fix the issues above and retry.");
139            }
140
141            println!();
142        }
143        crate::SourceType::SQLite => {
144            // SQLite to PostgreSQL migration (simpler path)
145            tracing::info!("Source type: SQLite");
146
147            // SQLite migrations don't support PostgreSQL-specific features
148            if !filter.is_empty() {
149                tracing::warn!(
150                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
151                );
152            }
153            if drop_existing {
154                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
155            }
156            if !enable_sync {
157                tracing::warn!(
158                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
159                );
160            }
161
162            return init_sqlite_to_postgres(source_url, target_url).await;
163        }
164        crate::SourceType::MongoDB => {
165            // MongoDB to PostgreSQL migration (simpler path)
166            tracing::info!("Source type: MongoDB");
167
168            // MongoDB migrations don't support PostgreSQL-specific features
169            if !filter.is_empty() {
170                tracing::warn!(
171                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
172                );
173            }
174            if drop_existing {
175                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
176            }
177            if !enable_sync {
178                tracing::warn!(
179                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
180                );
181            }
182
183            return init_mongodb_to_postgres(source_url, target_url).await;
184        }
185        crate::SourceType::MySQL => {
186            // MySQL to PostgreSQL replication (simpler path)
187            tracing::info!("Source type: MySQL");
188
189            // MySQL replications don't support PostgreSQL-specific features
190            if !filter.is_empty() {
191                tracing::warn!(
192                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
193                );
194            }
195            if drop_existing {
196                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
197            }
198            if !enable_sync {
199                tracing::warn!(
200                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
201                );
202            }
203
204            return init_mysql_to_postgres(source_url, target_url).await;
205        }
206    }
207
208    // CRITICAL: Ensure source and target are different to prevent data loss
209    crate::utils::validate_source_target_different(source_url, target_url)
210        .context("Source and target validation failed")?;
211    tracing::info!("✓ Verified source and target are different databases");
212
213    // Create managed temporary directory for dump files
214    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
215    let temp_path =
216        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
217    tracing::debug!("Using temp directory: {}", temp_path.display());
218
219    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
220        .context("Failed to determine checkpoint location")?;
221
222    // Step 1: Dump global objects
223    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
224    let globals_file = temp_path.join("globals.sql");
225    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
226
227    // Step 2: Restore global objects
228    tracing::info!("Step 2/4: Restoring global objects to target...");
229    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
230
231    // Step 3: Discover and filter databases
232    tracing::info!("Step 3/4: Discovering databases...");
233    let all_databases = {
234        // Scope the connection so it's dropped before subprocess operations
235        let source_client = postgres::connect_with_retry(source_url).await?;
236        migration::list_databases(&source_client).await?
237    }; // Connection dropped here
238
239    // Apply filtering rules
240    let databases: Vec<_> = all_databases
241        .into_iter()
242        .filter(|db| filter.should_replicate_database(&db.name))
243        .collect();
244
245    if databases.is_empty() {
246        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
247        if filter.is_empty() {
248            tracing::warn!("⚠ No user databases found on source");
249            tracing::warn!("  This is unusual - the source database appears empty");
250            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
251        } else {
252            tracing::warn!("⚠ No databases matched the filter criteria");
253            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
254        }
255        tracing::info!("✅ Initial replication complete (no databases to replicate)");
256        return Ok(());
257    }
258
259    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
260    let filter_hash = filter.fingerprint();
261    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
262        source_url,
263        target_url,
264        filter_hash,
265        drop_existing,
266        enable_sync,
267    );
268
269    let mut checkpoint_state = if allow_resume {
270        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
271            Some(existing) => {
272                // Try to validate the checkpoint
273                match existing.validate(&checkpoint_metadata, &database_names) {
274                    Ok(()) => {
275                        // Validation succeeded - resume from checkpoint
276                        if existing.completed_count() > 0 {
277                            tracing::info!(
278                                "Resume checkpoint found: {}/{} databases already replicated",
279                                existing.completed_count(),
280                                existing.total_databases()
281                            );
282                        } else {
283                            tracing::info!(
284                                "Resume checkpoint found but no databases marked complete yet"
285                            );
286                        }
287                        existing
288                    }
289                    Err(e) => {
290                        // Validation failed - log warning and start fresh
291                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
292                        tracing::warn!(
293                            "  Previous run configuration differs from current configuration"
294                        );
295                        tracing::warn!("  - Schema-only tables may have changed");
296                        tracing::warn!("  - Time filters may have changed");
297                        tracing::warn!("  - Table selection may have changed");
298                        tracing::warn!("  Error: {}", e);
299                        tracing::info!("");
300                        tracing::info!(
301                            "✓ Automatically discarding old checkpoint and starting fresh"
302                        );
303                        checkpoint::remove_checkpoint(&checkpoint_path)?;
304                        checkpoint::InitCheckpoint::new(
305                            checkpoint_metadata.clone(),
306                            &database_names,
307                        )
308                    }
309                }
310            }
311            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
312        }
313    } else {
314        if checkpoint_path.exists() {
315            tracing::info!(
316                "--no-resume supplied: discarding previous checkpoint at {}",
317                checkpoint_path.display()
318            );
319        }
320        checkpoint::remove_checkpoint(&checkpoint_path)?;
321        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
322    };
323
324    // Persist baseline state so crashes before first database can resume cleanly
325    checkpoint_state
326        .save(&checkpoint_path)
327        .context("Failed to persist checkpoint state")?;
328
329    tracing::info!("Found {} database(s) to replicate", databases.len());
330
331    // Estimate database sizes and get confirmation
332    if !skip_confirmation {
333        tracing::info!("Analyzing database sizes...");
334        let size_estimates = {
335            // Scope the connection so it's dropped after size estimation
336            let source_client = postgres::connect_with_retry(source_url).await?;
337            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
338                .await?
339        }; // Connection dropped here
340
341        if !confirm_replication(&size_estimates)? {
342            bail!("Replication cancelled by user");
343        }
344    }
345
346    // Step 4: Replicate each database
347    tracing::info!("Step 4/4: Replicating databases...");
348    for (idx, db_info) in databases.iter().enumerate() {
349        let filtered_tables = filter.predicate_tables(&db_info.name);
350        if checkpoint_state.is_completed(&db_info.name) {
351            tracing::info!(
352                "Skipping database '{}' (already completed per checkpoint)",
353                db_info.name
354            );
355            continue;
356        }
357        tracing::info!(
358            "Replicating database {}/{}: '{}'",
359            idx + 1,
360            databases.len(),
361            db_info.name
362        );
363
364        // Build connection URLs for this specific database
365        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
366        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
367
368        // Handle database creation atomically to avoid TOCTOU race condition
369        // Scope the connection so it's dropped before dump/restore subprocess operations
370        {
371            let target_client = postgres::connect_with_retry(target_url).await?;
372
373            // Validate database name to prevent SQL injection
374            crate::utils::validate_postgres_identifier(&db_info.name)
375                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
376
377            // Try to create database atomically (avoids TOCTOU vulnerability)
378            let create_query = format!(
379                "CREATE DATABASE {}",
380                crate::utils::quote_ident(&db_info.name)
381            );
382            match target_client.execute(&create_query, &[]).await {
383                Ok(_) => {
384                    tracing::info!("  Created database '{}'", db_info.name);
385                }
386                Err(err) => {
387                    // Check if error is "database already exists" (error code 42P04)
388                    if let Some(db_error) = err.as_db_error() {
389                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
390                            // Database already exists - handle based on user preferences
391                            tracing::info!(
392                                "  Database '{}' already exists on target",
393                                db_info.name
394                            );
395
396                            // Check if empty
397                            if database_is_empty(target_url, &db_info.name).await? {
398                                tracing::info!(
399                                    "  Database '{}' is empty, proceeding with restore",
400                                    db_info.name
401                                );
402                            } else {
403                                // Database exists and has data
404                                let should_drop = if drop_existing {
405                                    // Force drop with --drop-existing flag
406                                    true
407                                } else if skip_confirmation {
408                                    // Auto-confirm drop with --yes flag (non-interactive)
409                                    tracing::info!(
410                                        "  Auto-confirming drop for database '{}' (--yes flag)",
411                                        db_info.name
412                                    );
413                                    true
414                                } else {
415                                    // Interactive mode: prompt user
416                                    prompt_drop_database(&db_info.name)?
417                                };
418
419                                if should_drop {
420                                    drop_database_if_exists(&target_client, &db_info.name).await?;
421
422                                    // Recreate the database
423                                    let create_query = format!(
424                                        "CREATE DATABASE {}",
425                                        crate::utils::quote_ident(&db_info.name)
426                                    );
427                                    target_client
428                                        .execute(&create_query, &[])
429                                        .await
430                                        .with_context(|| {
431                                            format!(
432                                                "Failed to create database '{}' after drop",
433                                                db_info.name
434                                            )
435                                        })?;
436                                    tracing::info!("  Created database '{}'", db_info.name);
437                                } else {
438                                    bail!("Aborted: Database '{}' already exists", db_info.name);
439                                }
440                            }
441                        } else {
442                            // Some other database error - propagate it
443                            return Err(err).with_context(|| {
444                                format!("Failed to create database '{}'", db_info.name)
445                            });
446                        }
447                    } else {
448                        // Not a database error - propagate it
449                        return Err(err).with_context(|| {
450                            format!("Failed to create database '{}'", db_info.name)
451                        });
452                    }
453                }
454            }
455        } // Connection dropped here before dump/restore operations
456
457        // Dump and restore schema
458        tracing::info!("  Dumping schema for '{}'...", db_info.name);
459        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
460        migration::dump_schema(
461            &source_db_url,
462            &db_info.name,
463            schema_file.to_str().unwrap(),
464            &filter,
465        )
466        .await?;
467
468        tracing::info!("  Restoring schema for '{}'...", db_info.name);
469        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
470
471        // Dump and restore data (using directory format for parallel operations)
472        tracing::info!("  Dumping data for '{}'...", db_info.name);
473        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
474        migration::dump_data(
475            &source_db_url,
476            &db_info.name,
477            data_dir.to_str().unwrap(),
478            &filter,
479        )
480        .await?;
481
482        tracing::info!("  Restoring data for '{}'...", db_info.name);
483        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
484
485        if !filtered_tables.is_empty() {
486            tracing::info!(
487                "  Applying filtered replication for {} table(s)...",
488                filtered_tables.len()
489            );
490            migration::filtered::copy_filtered_tables(
491                &source_db_url,
492                &target_db_url,
493                &filtered_tables,
494            )
495            .await?;
496        }
497
498        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
499
500        checkpoint_state.mark_completed(&db_info.name);
501        checkpoint_state
502            .save(&checkpoint_path)
503            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
504    }
505
506    // Explicitly clean up temp directory
507    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
508    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
509        tracing::warn!("Failed to clean up temp directory: {}", e);
510        // Don't fail the entire operation if cleanup fails
511    }
512
513    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
514        tracing::warn!("Failed to remove checkpoint state: {}", err);
515    }
516
517    tracing::info!("✅ Initial replication complete");
518
519    // Check wal_level before attempting to set up sync
520    let mut should_enable_sync = enable_sync;
521    if enable_sync {
522        tracing::info!("Checking target wal_level for logical replication...");
523        let target_wal_level = {
524            // Scope the connection for quick wal_level check
525            let target_client = postgres::connect_with_retry(target_url).await?;
526            postgres::check_wal_level(&target_client).await?
527        }; // Connection dropped here
528
529        if target_wal_level != "logical" {
530            tracing::warn!("");
531            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
532            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
533            tracing::warn!("");
534            tracing::warn!("  To fix this:");
535            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
536            tracing::warn!("    2. Restart PostgreSQL server");
537            tracing::warn!(
538                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
539            );
540            tracing::warn!("");
541            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
542            should_enable_sync = false;
543        }
544    }
545
546    // Set up continuous logical replication if enabled
547    if should_enable_sync {
548        tracing::info!("");
549        tracing::info!("========================================");
550        tracing::info!("Step 5/5: Setting up continuous replication...");
551        tracing::info!("========================================");
552        tracing::info!("");
553
554        // Call sync command with the same filter
555        crate::commands::sync(
556            source_url,
557            target_url,
558            Some(filter),
559            None,
560            None,
561            None,
562            false,
563        )
564        .await
565        .context("Failed to set up continuous replication")?;
566
567        tracing::info!("");
568        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
569    } else {
570        tracing::info!("");
571        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
572        tracing::info!("  To enable it later, run:");
573        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
574    }
575
576    Ok(())
577}
578
579/// Replace the database name in a connection URL
580fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
581    // Parse URL to find database name
582    // Format: postgresql://user:pass@host:port/database?params
583
584    // Split by '?' to separate params
585    let parts: Vec<&str> = url.split('?').collect();
586    let base_url = parts[0];
587    let params = if parts.len() > 1 {
588        Some(parts[1])
589    } else {
590        None
591    };
592
593    // Split base by '/' to get everything before database name
594    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
595    if url_parts.len() != 2 {
596        anyhow::bail!("Invalid connection URL format");
597    }
598
599    // Reconstruct URL with new database name
600    let mut new_url = format!("{}/{}", url_parts[1], new_database);
601    if let Some(p) = params {
602        new_url = format!("{}?{}", new_url, p);
603    }
604
605    Ok(new_url)
606}
607
608/// Display database size estimates and prompt for confirmation
609///
610/// Shows a table with database names, sizes, and estimated replication times.
611/// Prompts the user to proceed with the replication.
612///
613/// # Arguments
614///
615/// * `sizes` - Vector of database size estimates
616///
617/// # Returns
618///
619/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
620///
621/// # Errors
622///
623/// Returns an error if stdin/stdout operations fail.
624fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
625    use std::time::Duration;
626
627    // Calculate totals
628    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
629    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
630
631    // Print table header
632    println!();
633    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
634    println!("{}", "─".repeat(50));
635
636    // Print each database
637    for size in sizes {
638        println!(
639            "{:<20} {:<12} {:<15}",
640            size.name,
641            size.size_human,
642            migration::format_duration(size.estimated_duration)
643        );
644    }
645
646    // Print totals
647    println!("{}", "─".repeat(50));
648    println!(
649        "Total: {} (estimated {})",
650        migration::format_bytes(total_bytes),
651        migration::format_duration(total_duration)
652    );
653    println!();
654
655    // Prompt for confirmation
656    print!("Proceed with replication? [y/N]: ");
657    io::stdout().flush()?;
658
659    let mut input = String::new();
660    io::stdin()
661        .read_line(&mut input)
662        .context("Failed to read user input")?;
663
664    Ok(input.trim().to_lowercase() == "y")
665}
666
667/// Checks if a database is empty (no user tables)
668async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
669    // Need to connect to the specific database to check tables
670    let db_url = replace_database_in_url(target_url, db_name)?;
671    let client = postgres::connect_with_retry(&db_url).await?;
672
673    let query = "
674        SELECT COUNT(*)
675        FROM information_schema.tables
676        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
677    ";
678
679    let row = client.query_one(query, &[]).await?;
680    let count: i64 = row.get(0);
681
682    Ok(count == 0)
683}
684
685/// Prompts user to drop existing database
686fn prompt_drop_database(db_name: &str) -> Result<bool> {
687    use std::io::{self, Write};
688
689    print!(
690        "\nWarning: Database '{}' already exists on target and contains data.\n\
691         Drop and recreate database? This will delete all existing data. [y/N]: ",
692        db_name
693    );
694    io::stdout().flush()?;
695
696    let mut input = String::new();
697    io::stdin().read_line(&mut input)?;
698
699    Ok(input.trim().eq_ignore_ascii_case("y"))
700}
701
702/// Drops a database if it exists
703async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
704    // Validate database name to prevent SQL injection
705    crate::utils::validate_postgres_identifier(db_name)
706        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
707
708    tracing::info!("  Dropping existing database '{}'...", db_name);
709
710    // Terminate existing connections to the database
711    let terminate_query = "
712        SELECT pg_terminate_backend(pid)
713        FROM pg_stat_activity
714        WHERE datname = $1 AND pid <> pg_backend_pid()
715    ";
716    target_conn.execute(terminate_query, &[&db_name]).await?;
717
718    // Drop the database
719    let drop_query = format!(
720        "DROP DATABASE IF EXISTS {}",
721        crate::utils::quote_ident(db_name)
722    );
723    target_conn
724        .execute(&drop_query, &[])
725        .await
726        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
727
728    tracing::info!("  ✓ Database '{}' dropped", db_name);
729    Ok(())
730}
731
732/// Initial replication from SQLite to PostgreSQL
733///
734/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
735/// 1. Validates SQLite file exists and is readable
736/// 2. Validates PostgreSQL target connection
737/// 3. Lists all tables from SQLite database
738/// 4. For each table:
739///    - Converts rows to JSONB format
740///    - Creates JSONB table in PostgreSQL
741///    - Batch inserts all data
742///
743/// All SQLite data is stored as JSONB with metadata:
744/// - id: Original row ID (from ID column or row number)
745/// - data: Complete row as JSON object
746/// - _source_type: "sqlite"
747/// - _migrated_at: Timestamp of migration
748///
749/// # Arguments
750///
751/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
752/// * `target_url` - PostgreSQL connection string for target (Seren) database
753///
754/// # Returns
755///
756/// Returns `Ok(())` if migration completes successfully.
757///
758/// # Errors
759///
760/// This function will return an error if:
761/// - SQLite file doesn't exist or isn't readable
762/// - Cannot connect to target PostgreSQL database
763/// - Table conversion fails
764/// - Database creation or insert operations fail
765///
766/// # Examples
767///
768/// ```no_run
769/// # use anyhow::Result;
770/// # use database_replicator::commands::init::init_sqlite_to_postgres;
771/// # async fn example() -> Result<()> {
772/// init_sqlite_to_postgres(
773///     "database.db",
774///     "postgresql://user:pass@seren.example.com/targetdb"
775/// ).await?;
776/// # Ok(())
777/// # }
778/// ```
779pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
780    tracing::info!("Starting SQLite to PostgreSQL migration...");
781
782    // Step 1: Validate SQLite file
783    tracing::info!("Step 1/4: Validating SQLite database...");
784    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
785        .context("SQLite file validation failed")?;
786    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
787
788    // Step 2: Open SQLite connection (read-only)
789    tracing::info!("Step 2/4: Opening SQLite database...");
790    let sqlite_conn =
791        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
792    tracing::info!("  ✓ SQLite database opened (read-only mode)");
793
794    // Step 3: List all tables
795    tracing::info!("Step 3/4: Discovering tables...");
796    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
797        .context("Failed to list tables from SQLite database")?;
798
799    if tables.is_empty() {
800        tracing::warn!("⚠ No tables found in SQLite database");
801        tracing::info!("✅ Migration complete (no tables to migrate)");
802        return Ok(());
803    }
804
805    tracing::info!("Found {} table(s) to migrate", tables.len());
806
807    // Connect to PostgreSQL target
808    let target_client = postgres::connect_with_retry(target_url).await?;
809    tracing::info!("  ✓ Connected to PostgreSQL target");
810
811    // Step 4: Migrate each table
812    tracing::info!("Step 4/4: Migrating tables...");
813    for (idx, table_name) in tables.iter().enumerate() {
814        tracing::info!(
815            "Migrating table {}/{}: '{}'",
816            idx + 1,
817            tables.len(),
818            table_name
819        );
820
821        // Convert SQLite table to JSONB
822        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
823            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
824
825        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
826
827        // Create JSONB table in PostgreSQL
828        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
829            .await
830            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
831
832        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
833
834        if !rows.is_empty() {
835            // Batch insert all rows
836            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
837                .await
838                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
839
840            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
841        } else {
842            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
843        }
844    }
845
846    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
847    tracing::info!(
848        "   Migrated {} table(s) from '{}' to PostgreSQL",
849        tables.len(),
850        sqlite_path
851    );
852
853    Ok(())
854}
855
856/// Initial replication from MongoDB to PostgreSQL
857///
858/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
859/// 1. Validates MongoDB connection string
860/// 2. Connects to MongoDB and verifies connection
861/// 3. Extracts database name from connection string
862/// 4. Lists all collections from MongoDB database
863/// 5. For each collection:
864///    - Converts documents to JSONB format
865///    - Creates JSONB table in PostgreSQL
866///    - Batch inserts all data
867///
868/// All MongoDB data is stored as JSONB with metadata:
869/// - id: Original _id field (ObjectId → hex, String/Int → string)
870/// - data: Complete document as JSON object
871/// - _source_type: "mongodb"
872/// - _migrated_at: Timestamp of migration
873///
874/// # Arguments
875///
876/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
877/// * `target_url` - PostgreSQL connection string for target (Seren) database
878///
879/// # Returns
880///
881/// Returns `Ok(())` if migration completes successfully.
882///
883/// # Errors
884///
885/// This function will return an error if:
886/// - MongoDB connection string is invalid
887/// - Cannot connect to MongoDB database
888/// - Database name is not specified in connection string
889/// - Cannot connect to target PostgreSQL database
890/// - Collection conversion fails
891/// - Database creation or insert operations fail
892///
893/// # Examples
894///
895/// ```no_run
896/// # use anyhow::Result;
897/// # use database_replicator::commands::init::init_mongodb_to_postgres;
898/// # async fn example() -> Result<()> {
899/// init_mongodb_to_postgres(
900///     "mongodb://localhost:27017/mydb",
901///     "postgresql://user:pass@seren.example.com/targetdb"
902/// ).await?;
903/// # Ok(())
904/// # }
905/// ```
906pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
907    tracing::info!("Starting MongoDB to PostgreSQL migration...");
908
909    // Step 1: Validate and connect to MongoDB
910    tracing::info!("Step 1/5: Validating MongoDB connection...");
911    let client = crate::mongodb::connect_mongodb(mongo_url)
912        .await
913        .context("MongoDB connection failed")?;
914    tracing::info!("  ✓ MongoDB connection validated");
915
916    // Step 2: Extract database name
917    tracing::info!("Step 2/5: Extracting database name...");
918    let db_name = crate::mongodb::extract_database_name(mongo_url)
919        .await
920        .context("Failed to parse MongoDB connection string")?
921        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
922    tracing::info!("  ✓ Database name: '{}'", db_name);
923
924    // Step 3: List all collections
925    tracing::info!("Step 3/5: Discovering collections...");
926    let db = client.database(&db_name);
927    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
928        .await
929        .context("Failed to list collections from MongoDB database")?;
930
931    if collections.is_empty() {
932        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
933        tracing::info!("✅ Migration complete (no collections to migrate)");
934        return Ok(());
935    }
936
937    tracing::info!("Found {} collection(s) to migrate", collections.len());
938
939    // Step 4: Connect to PostgreSQL target
940    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
941    let target_client = postgres::connect_with_retry(target_url).await?;
942    tracing::info!("  ✓ Connected to PostgreSQL target");
943
944    // Step 5: Migrate each collection
945    tracing::info!("Step 5/5: Migrating collections...");
946    for (idx, collection_name) in collections.iter().enumerate() {
947        tracing::info!(
948            "Migrating collection {}/{}: '{}'",
949            idx + 1,
950            collections.len(),
951            collection_name
952        );
953
954        // Convert MongoDB collection to JSONB
955        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
956            .await
957            .with_context(|| {
958                format!(
959                    "Failed to convert collection '{}' to JSONB",
960                    collection_name
961                )
962            })?;
963
964        tracing::info!(
965            "  ✓ Converted {} documents from '{}'",
966            rows.len(),
967            collection_name
968        );
969
970        // Create JSONB table in PostgreSQL
971        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
972            .await
973            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
974
975        tracing::info!(
976            "  ✓ Created JSONB table '{}' in PostgreSQL",
977            collection_name
978        );
979
980        if !rows.is_empty() {
981            // Batch insert all rows
982            crate::jsonb::writer::insert_jsonb_batch(
983                &target_client,
984                collection_name,
985                rows,
986                "mongodb",
987            )
988            .await
989            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
990
991            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
992        } else {
993            tracing::info!(
994                "  ✓ Collection '{}' is empty (no documents to insert)",
995                collection_name
996            );
997        }
998    }
999
1000    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1001    tracing::info!(
1002        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1003        collections.len(),
1004        db_name
1005    );
1006
1007    Ok(())
1008}
1009
1010/// Initial replication from MySQL to PostgreSQL
1011///
1012/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1013/// 1. Validates MySQL connection string
1014/// 2. Connects to MySQL and verifies connection
1015/// 3. Extracts database name from connection string
1016/// 4. Lists all tables from MySQL database
1017/// 5. For each table:
1018///    - Converts rows to JSONB format
1019///    - Creates JSONB table in PostgreSQL
1020///    - Batch inserts all data
1021///
1022/// All MySQL data is stored as JSONB with metadata:
1023/// - id: Primary key or auto-generated ID
1024/// - data: Complete row as JSON object
1025/// - _source_type: "mysql"
1026/// - _migrated_at: Timestamp of replication
1027///
1028/// # Arguments
1029///
1030/// * `mysql_url` - MySQL connection string (mysql://...)
1031/// * `target_url` - PostgreSQL connection string for target (Seren) database
1032///
1033/// # Returns
1034///
1035/// Returns `Ok(())` if replication completes successfully.
1036///
1037/// # Errors
1038///
1039/// This function will return an error if:
1040/// - MySQL connection string is invalid
1041/// - Cannot connect to MySQL database
1042/// - Database name is not specified in connection string
1043/// - Cannot connect to target PostgreSQL database
1044/// - Table conversion fails
1045/// - Database creation or insert operations fail
1046///
1047/// # Examples
1048///
1049/// ```no_run
1050/// # use anyhow::Result;
1051/// # use database_replicator::commands::init::init_mysql_to_postgres;
1052/// # async fn example() -> Result<()> {
1053/// init_mysql_to_postgres(
1054///     "mysql://user:pass@localhost:3306/mydb",
1055///     "postgresql://user:pass@seren.example.com/targetdb"
1056/// ).await?;
1057/// # Ok(())
1058/// # }
1059/// ```
1060pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1061    tracing::info!("Starting MySQL to PostgreSQL replication...");
1062
1063    // Step 1: Validate and connect to MySQL
1064    tracing::info!("Step 1/5: Validating MySQL connection...");
1065    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1066        .await
1067        .context("MySQL connection failed")?;
1068    tracing::info!("  ✓ MySQL connection validated");
1069
1070    // Step 2: Extract database name
1071    tracing::info!("Step 2/5: Extracting database name...");
1072    let db_name = crate::mysql::extract_database_name(mysql_url)
1073        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1074    tracing::info!("  ✓ Database name: '{}'", db_name);
1075
1076    // Step 3: List all tables
1077    tracing::info!("Step 3/5: Discovering tables...");
1078    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1079        .await
1080        .context("Failed to list tables from MySQL database")?;
1081
1082    if tables.is_empty() {
1083        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1084        tracing::info!("✅ Replication complete (no tables to replicate)");
1085        return Ok(());
1086    }
1087
1088    tracing::info!("Found {} table(s) to replicate", tables.len());
1089
1090    // Step 4: Connect to PostgreSQL target
1091    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1092    let target_client = postgres::connect_with_retry(target_url).await?;
1093    tracing::info!("  ✓ Connected to PostgreSQL target");
1094
1095    // Step 5: Replicate each table
1096    tracing::info!("Step 5/5: Replicating tables...");
1097    for (idx, table_name) in tables.iter().enumerate() {
1098        tracing::info!(
1099            "Replicating table {}/{}: '{}'",
1100            idx + 1,
1101            tables.len(),
1102            table_name
1103        );
1104
1105        // Convert MySQL table to JSONB
1106        let rows =
1107            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1108                .await
1109                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1110
1111        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1112
1113        // Create JSONB table in PostgreSQL
1114        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1115            .await
1116            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1117
1118        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1119
1120        if !rows.is_empty() {
1121            // Batch insert all rows
1122            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1123                .await
1124                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1125
1126            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1127        } else {
1128            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1129        }
1130    }
1131
1132    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1133    tracing::info!(
1134        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1135        tables.len(),
1136        db_name
1137    );
1138
1139    Ok(())
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144    use super::*;
1145
1146    #[tokio::test]
1147    #[ignore]
1148    async fn test_init_replicates_database() {
1149        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1150        let target = std::env::var("TEST_TARGET_URL").unwrap();
1151
1152        // Skip confirmation for automated tests, disable sync to keep test simple
1153        let filter = crate::filters::ReplicationFilter::empty();
1154        let result = init(&source, &target, true, filter, false, false, true, false).await;
1155        assert!(result.is_ok());
1156    }
1157
1158    #[test]
1159    fn test_replace_database_in_url() {
1160        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1161        let result = replace_database_in_url(url, "newdb").unwrap();
1162        assert_eq!(
1163            result,
1164            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1165        );
1166
1167        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1168        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1169        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1170    }
1171
1172    #[tokio::test]
1173    #[ignore]
1174    async fn test_database_is_empty() {
1175        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1176
1177        // postgres database might be empty of user tables
1178        // This test just verifies the function doesn't crash
1179        let result = database_is_empty(&url, "postgres").await;
1180        assert!(result.is_ok());
1181    }
1182}