database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9/// Initial replication command for snapshot schema and data copy
10///
11/// Performs a full database dump and restore from source to target in steps:
12/// 1. Estimates database sizes and replication times
13/// 2. Prompts for confirmation (unless skip_confirmation is true)
14/// 3. Dumps global objects (roles, tablespaces) from source
15/// 4. Restores global objects to target
16/// 5. Discovers all user databases on source
17/// 6. Replicates each database (schema and data)
18/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
19///
20/// Uses temporary directory for dump files, which is automatically cleaned up.
21///
22/// # Arguments
23///
24/// * `source_url` - PostgreSQL connection string for source (Neon) database
25/// * `target_url` - PostgreSQL connection string for target (Seren) database
26/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
27/// * `filter` - Database and table filtering rules
28/// * `drop_existing` - Drop existing databases on target before copying
29/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
30/// * `allow_resume` - Resume from checkpoint if available (default: true)
31/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
32///
33/// # Returns
34///
35/// Returns `Ok(())` if replication completes successfully.
36///
37/// # Errors
38///
39/// This function will return an error if:
40/// - Cannot create temporary directory
41/// - Global objects dump/restore fails
42/// - Cannot connect to source database
43/// - Database discovery fails
44/// - Any database replication fails
45/// - User declines confirmation prompt
46/// - Logical replication setup fails (if enable_sync is true)
47///
48/// # Examples
49///
50/// ```no_run
51/// # use anyhow::Result;
52/// # use database_replicator::commands::init;
53/// # use database_replicator::filters::ReplicationFilter;
54/// # async fn example() -> Result<()> {
55/// // With confirmation prompt and automatic sync (default)
56/// init(
57///     "postgresql://user:pass@neon.tech/sourcedb",
58///     "postgresql://user:pass@seren.example.com/targetdb",
59///     false,
60///     ReplicationFilter::empty(),
61///     false,
62///     true,   // Enable continuous replication
63///     true,   // Allow resume
64///     false,  // Not forcing local execution
65/// ).await?;
66///
67/// // Snapshot only (no continuous replication)
68/// init(
69///     "postgresql://user:pass@neon.tech/sourcedb",
70///     "postgresql://user:pass@seren.example.com/targetdb",
71///     true,
72///     ReplicationFilter::empty(),
73///     false,
74///     false,  // Disable continuous replication
75///     true,   // Allow resume
76///     true,   // Force local execution (--local flag)
77/// ).await?;
78/// # Ok(())
79/// # }
80/// ```
81#[allow(clippy::too_many_arguments)]
82pub async fn init(
83    source_url: &str,
84    target_url: &str,
85    skip_confirmation: bool,
86    filter: crate::filters::ReplicationFilter,
87    drop_existing: bool,
88    enable_sync: bool,
89    allow_resume: bool,
90    force_local: bool,
91) -> Result<()> {
92    tracing::info!("Starting initial replication...");
93
94    // Detect source database type and route to appropriate implementation
95    let source_type =
96        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
97
98    match source_type {
99        crate::SourceType::PostgreSQL => {
100            // PostgreSQL to PostgreSQL replication (existing logic below)
101            tracing::info!("Source type: PostgreSQL");
102
103            // Run pre-flight checks before any destructive operations
104            tracing::info!("Running pre-flight checks...");
105
106            let databases = filter.include_databases().map(|v| v.to_vec());
107            let preflight_result = crate::preflight::run_preflight_checks(
108                source_url,
109                target_url,
110                databases.as_deref(),
111            )
112            .await?;
113
114            preflight_result.print();
115
116            if !preflight_result.all_passed() {
117                // Check if we can auto-fallback to remote
118                if preflight_result.tool_version_incompatible
119                    && crate::utils::is_serendb_target(target_url)
120                    && !force_local
121                {
122                    println!();
123                    tracing::info!(
124                        "Tool version incompatible. Switching to SerenAI cloud execution..."
125                    );
126                    // Return special error that main.rs catches to trigger remote
127                    bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
128                }
129
130                // Cannot auto-fallback
131                if force_local {
132                    bail!(
133                        "Pre-flight checks failed. Cannot continue with --local flag.\n\
134                         Fix the issues above or remove --local to allow remote execution."
135                    );
136                }
137
138                bail!("Pre-flight checks failed. Fix the issues above and retry.");
139            }
140
141            println!();
142        }
143        crate::SourceType::SQLite => {
144            // SQLite to PostgreSQL migration (simpler path)
145            tracing::info!("Source type: SQLite");
146
147            // SQLite migrations don't support PostgreSQL-specific features
148            if !filter.is_empty() {
149                tracing::warn!(
150                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
151                );
152            }
153            if drop_existing {
154                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
155            }
156            if !enable_sync {
157                tracing::warn!(
158                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
159                );
160            }
161
162            return init_sqlite_to_postgres(source_url, target_url).await;
163        }
164        crate::SourceType::MongoDB => {
165            // MongoDB to PostgreSQL migration (simpler path)
166            tracing::info!("Source type: MongoDB");
167
168            // MongoDB migrations don't support PostgreSQL-specific features
169            if !filter.is_empty() {
170                tracing::warn!(
171                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
172                );
173            }
174            if drop_existing {
175                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
176            }
177            if !enable_sync {
178                tracing::warn!(
179                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
180                );
181            }
182
183            return init_mongodb_to_postgres(source_url, target_url).await;
184        }
185        crate::SourceType::MySQL => {
186            // MySQL to PostgreSQL replication (simpler path)
187            tracing::info!("Source type: MySQL");
188
189            // MySQL replications don't support PostgreSQL-specific features
190            if !filter.is_empty() {
191                tracing::warn!(
192                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
193                );
194            }
195            if drop_existing {
196                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
197            }
198            if !enable_sync {
199                tracing::warn!(
200                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
201                );
202            }
203
204            return init_mysql_to_postgres(source_url, target_url).await;
205        }
206    }
207
208    // CRITICAL: Ensure source and target are different to prevent data loss
209    crate::utils::validate_source_target_different(source_url, target_url)
210        .context("Source and target validation failed")?;
211    tracing::info!("✓ Verified source and target are different databases");
212
213    // Create managed temporary directory for dump files
214    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
215    let temp_path =
216        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
217    tracing::debug!("Using temp directory: {}", temp_path.display());
218
219    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
220        .context("Failed to determine checkpoint location")?;
221
222    // Step 1: Dump global objects
223    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
224    let globals_file = temp_path.join("globals.sql");
225    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
226    migration::sanitize_globals_dump(globals_file.to_str().unwrap())
227        .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
228
229    // Step 2: Restore global objects
230    tracing::info!("Step 2/4: Restoring global objects to target...");
231    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
232
233    // Step 3: Discover and filter databases
234    tracing::info!("Step 3/4: Discovering databases...");
235    let all_databases = {
236        // Scope the connection so it's dropped before subprocess operations
237        let source_client = postgres::connect_with_retry(source_url).await?;
238        migration::list_databases(&source_client).await?
239    }; // Connection dropped here
240
241    // Apply filtering rules
242    let databases: Vec<_> = all_databases
243        .into_iter()
244        .filter(|db| filter.should_replicate_database(&db.name))
245        .collect();
246
247    if databases.is_empty() {
248        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
249        if filter.is_empty() {
250            tracing::warn!("⚠ No user databases found on source");
251            tracing::warn!("  This is unusual - the source database appears empty");
252            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
253        } else {
254            tracing::warn!("⚠ No databases matched the filter criteria");
255            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
256        }
257        tracing::info!("✅ Initial replication complete (no databases to replicate)");
258        return Ok(());
259    }
260
261    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
262    let filter_hash = filter.fingerprint();
263    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
264        source_url,
265        target_url,
266        filter_hash,
267        drop_existing,
268        enable_sync,
269    );
270
271    let mut checkpoint_state = if allow_resume {
272        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
273            Some(existing) => {
274                // Try to validate the checkpoint
275                match existing.validate(&checkpoint_metadata, &database_names) {
276                    Ok(()) => {
277                        // Validation succeeded - resume from checkpoint
278                        if existing.completed_count() > 0 {
279                            tracing::info!(
280                                "Resume checkpoint found: {}/{} databases already replicated",
281                                existing.completed_count(),
282                                existing.total_databases()
283                            );
284                        } else {
285                            tracing::info!(
286                                "Resume checkpoint found but no databases marked complete yet"
287                            );
288                        }
289                        existing
290                    }
291                    Err(e) => {
292                        // Validation failed - log warning and start fresh
293                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
294                        tracing::warn!(
295                            "  Previous run configuration differs from current configuration"
296                        );
297                        tracing::warn!("  - Schema-only tables may have changed");
298                        tracing::warn!("  - Time filters may have changed");
299                        tracing::warn!("  - Table selection may have changed");
300                        tracing::warn!("  Error: {}", e);
301                        tracing::info!("");
302                        tracing::info!(
303                            "✓ Automatically discarding old checkpoint and starting fresh"
304                        );
305                        checkpoint::remove_checkpoint(&checkpoint_path)?;
306                        checkpoint::InitCheckpoint::new(
307                            checkpoint_metadata.clone(),
308                            &database_names,
309                        )
310                    }
311                }
312            }
313            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
314        }
315    } else {
316        if checkpoint_path.exists() {
317            tracing::info!(
318                "--no-resume supplied: discarding previous checkpoint at {}",
319                checkpoint_path.display()
320            );
321        }
322        checkpoint::remove_checkpoint(&checkpoint_path)?;
323        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
324    };
325
326    // Persist baseline state so crashes before first database can resume cleanly
327    checkpoint_state
328        .save(&checkpoint_path)
329        .context("Failed to persist checkpoint state")?;
330
331    tracing::info!("Found {} database(s) to replicate", databases.len());
332
333    // Estimate database sizes and get confirmation
334    if !skip_confirmation {
335        tracing::info!("Analyzing database sizes...");
336        let size_estimates = {
337            // Scope the connection so it's dropped after size estimation
338            let source_client = postgres::connect_with_retry(source_url).await?;
339            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
340                .await?
341        }; // Connection dropped here
342
343        if !confirm_replication(&size_estimates)? {
344            bail!("Replication cancelled by user");
345        }
346    }
347
348    // Step 4: Replicate each database
349    tracing::info!("Step 4/4: Replicating databases...");
350    for (idx, db_info) in databases.iter().enumerate() {
351        let filtered_tables = filter.predicate_tables(&db_info.name);
352        if checkpoint_state.is_completed(&db_info.name) {
353            tracing::info!(
354                "Skipping database '{}' (already completed per checkpoint)",
355                db_info.name
356            );
357            continue;
358        }
359        tracing::info!(
360            "Replicating database {}/{}: '{}'",
361            idx + 1,
362            databases.len(),
363            db_info.name
364        );
365
366        // Build connection URLs for this specific database
367        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
368        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
369
370        // Handle database creation atomically to avoid TOCTOU race condition
371        // Scope the connection so it's dropped before dump/restore subprocess operations
372        {
373            let target_client = postgres::connect_with_retry(target_url).await?;
374
375            // Validate database name to prevent SQL injection
376            crate::utils::validate_postgres_identifier(&db_info.name)
377                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
378
379            // Try to create database atomically (avoids TOCTOU vulnerability)
380            let create_query = format!(
381                "CREATE DATABASE {}",
382                crate::utils::quote_ident(&db_info.name)
383            );
384            match target_client.execute(&create_query, &[]).await {
385                Ok(_) => {
386                    tracing::info!("  Created database '{}'", db_info.name);
387                }
388                Err(err) => {
389                    // Check if error is "database already exists" (error code 42P04)
390                    if let Some(db_error) = err.as_db_error() {
391                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
392                            // Database already exists - handle based on user preferences
393                            tracing::info!(
394                                "  Database '{}' already exists on target",
395                                db_info.name
396                            );
397
398                            // Check if empty
399                            if database_is_empty(target_url, &db_info.name).await? {
400                                tracing::info!(
401                                    "  Database '{}' is empty, proceeding with restore",
402                                    db_info.name
403                                );
404                            } else {
405                                // Database exists and has data
406                                let should_drop = if drop_existing {
407                                    // Force drop with --drop-existing flag
408                                    true
409                                } else if skip_confirmation {
410                                    // Auto-confirm drop with --yes flag (non-interactive)
411                                    tracing::info!(
412                                        "  Auto-confirming drop for database '{}' (--yes flag)",
413                                        db_info.name
414                                    );
415                                    true
416                                } else {
417                                    // Interactive mode: prompt user
418                                    prompt_drop_database(&db_info.name)?
419                                };
420
421                                if should_drop {
422                                    drop_database_if_exists(&target_client, &db_info.name).await?;
423
424                                    // Recreate the database
425                                    let create_query = format!(
426                                        "CREATE DATABASE {}",
427                                        crate::utils::quote_ident(&db_info.name)
428                                    );
429                                    target_client
430                                        .execute(&create_query, &[])
431                                        .await
432                                        .with_context(|| {
433                                            format!(
434                                                "Failed to create database '{}' after drop",
435                                                db_info.name
436                                            )
437                                        })?;
438                                    tracing::info!("  Created database '{}'", db_info.name);
439                                } else {
440                                    bail!("Aborted: Database '{}' already exists", db_info.name);
441                                }
442                            }
443                        } else {
444                            // Some other database error - propagate it
445                            return Err(err).with_context(|| {
446                                format!("Failed to create database '{}'", db_info.name)
447                            });
448                        }
449                    } else {
450                        // Not a database error - propagate it
451                        return Err(err).with_context(|| {
452                            format!("Failed to create database '{}'", db_info.name)
453                        });
454                    }
455                }
456            }
457        } // Connection dropped here before dump/restore operations
458
459        // Dump and restore schema
460        tracing::info!("  Dumping schema for '{}'...", db_info.name);
461        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
462        migration::dump_schema(
463            &source_db_url,
464            &db_info.name,
465            schema_file.to_str().unwrap(),
466            &filter,
467        )
468        .await?;
469
470        tracing::info!("  Restoring schema for '{}'...", db_info.name);
471        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
472
473        // Dump and restore data (using directory format for parallel operations)
474        tracing::info!("  Dumping data for '{}'...", db_info.name);
475        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
476        migration::dump_data(
477            &source_db_url,
478            &db_info.name,
479            data_dir.to_str().unwrap(),
480            &filter,
481        )
482        .await?;
483
484        tracing::info!("  Restoring data for '{}'...", db_info.name);
485        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
486
487        if !filtered_tables.is_empty() {
488            tracing::info!(
489                "  Applying filtered replication for {} table(s)...",
490                filtered_tables.len()
491            );
492            migration::filtered::copy_filtered_tables(
493                &source_db_url,
494                &target_db_url,
495                &filtered_tables,
496            )
497            .await?;
498        }
499
500        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
501
502        checkpoint_state.mark_completed(&db_info.name);
503        checkpoint_state
504            .save(&checkpoint_path)
505            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
506    }
507
508    // Explicitly clean up temp directory
509    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
510    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
511        tracing::warn!("Failed to clean up temp directory: {}", e);
512        // Don't fail the entire operation if cleanup fails
513    }
514
515    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
516        tracing::warn!("Failed to remove checkpoint state: {}", err);
517    }
518
519    tracing::info!("✅ Initial replication complete");
520
521    // Check wal_level before attempting to set up sync
522    let mut should_enable_sync = enable_sync;
523    if enable_sync {
524        tracing::info!("Checking target wal_level for logical replication...");
525        let target_wal_level = {
526            // Scope the connection for quick wal_level check
527            let target_client = postgres::connect_with_retry(target_url).await?;
528            postgres::check_wal_level(&target_client).await?
529        }; // Connection dropped here
530
531        if target_wal_level != "logical" {
532            tracing::warn!("");
533            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
534            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
535            tracing::warn!("");
536            tracing::warn!("  To fix this:");
537            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
538            tracing::warn!("    2. Restart PostgreSQL server");
539            tracing::warn!(
540                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
541            );
542            tracing::warn!("");
543            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
544            should_enable_sync = false;
545        }
546    }
547
548    // Set up continuous logical replication if enabled
549    if should_enable_sync {
550        tracing::info!("");
551        tracing::info!("========================================");
552        tracing::info!("Step 5/5: Setting up continuous replication...");
553        tracing::info!("========================================");
554        tracing::info!("");
555
556        // Call sync command with the same filter
557        crate::commands::sync(
558            source_url,
559            target_url,
560            Some(filter),
561            None,
562            None,
563            None,
564            false,
565        )
566        .await
567        .context("Failed to set up continuous replication")?;
568
569        tracing::info!("");
570        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
571    } else {
572        tracing::info!("");
573        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
574        tracing::info!("  To enable it later, run:");
575        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
576    }
577
578    Ok(())
579}
580
581/// Replace the database name in a connection URL
582fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
583    // Parse URL to find database name
584    // Format: postgresql://user:pass@host:port/database?params
585
586    // Split by '?' to separate params
587    let parts: Vec<&str> = url.split('?').collect();
588    let base_url = parts[0];
589    let params = if parts.len() > 1 {
590        Some(parts[1])
591    } else {
592        None
593    };
594
595    // Split base by '/' to get everything before database name
596    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
597    if url_parts.len() != 2 {
598        anyhow::bail!("Invalid connection URL format");
599    }
600
601    // Reconstruct URL with new database name
602    let mut new_url = format!("{}/{}", url_parts[1], new_database);
603    if let Some(p) = params {
604        new_url = format!("{}?{}", new_url, p);
605    }
606
607    Ok(new_url)
608}
609
610/// Display database size estimates and prompt for confirmation
611///
612/// Shows a table with database names, sizes, and estimated replication times.
613/// Prompts the user to proceed with the replication.
614///
615/// # Arguments
616///
617/// * `sizes` - Vector of database size estimates
618///
619/// # Returns
620///
621/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
622///
623/// # Errors
624///
625/// Returns an error if stdin/stdout operations fail.
626fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
627    use std::time::Duration;
628
629    // Calculate totals
630    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
631    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
632
633    // Print table header
634    println!();
635    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
636    println!("{}", "─".repeat(50));
637
638    // Print each database
639    for size in sizes {
640        println!(
641            "{:<20} {:<12} {:<15}",
642            size.name,
643            size.size_human,
644            migration::format_duration(size.estimated_duration)
645        );
646    }
647
648    // Print totals
649    println!("{}", "─".repeat(50));
650    println!(
651        "Total: {} (estimated {})",
652        migration::format_bytes(total_bytes),
653        migration::format_duration(total_duration)
654    );
655    println!();
656
657    // Prompt for confirmation
658    print!("Proceed with replication? [y/N]: ");
659    io::stdout().flush()?;
660
661    let mut input = String::new();
662    io::stdin()
663        .read_line(&mut input)
664        .context("Failed to read user input")?;
665
666    Ok(input.trim().to_lowercase() == "y")
667}
668
669/// Checks if a database is empty (no user tables)
670async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
671    // Need to connect to the specific database to check tables
672    let db_url = replace_database_in_url(target_url, db_name)?;
673    let client = postgres::connect_with_retry(&db_url).await?;
674
675    let query = "
676        SELECT COUNT(*)
677        FROM information_schema.tables
678        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
679    ";
680
681    let row = client.query_one(query, &[]).await?;
682    let count: i64 = row.get(0);
683
684    Ok(count == 0)
685}
686
687/// Prompts user to drop existing database
688fn prompt_drop_database(db_name: &str) -> Result<bool> {
689    use std::io::{self, Write};
690
691    print!(
692        "\nWarning: Database '{}' already exists on target and contains data.\n\
693         Drop and recreate database? This will delete all existing data. [y/N]: ",
694        db_name
695    );
696    io::stdout().flush()?;
697
698    let mut input = String::new();
699    io::stdin().read_line(&mut input)?;
700
701    Ok(input.trim().eq_ignore_ascii_case("y"))
702}
703
704/// Drops a database if it exists
705async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
706    // Validate database name to prevent SQL injection
707    crate::utils::validate_postgres_identifier(db_name)
708        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
709
710    tracing::info!("  Dropping existing database '{}'...", db_name);
711
712    // Terminate existing connections to the database
713    let terminate_query = "
714        SELECT pg_terminate_backend(pid)
715        FROM pg_stat_activity
716        WHERE datname = $1 AND pid <> pg_backend_pid()
717    ";
718    target_conn.execute(terminate_query, &[&db_name]).await?;
719
720    // Drop the database
721    let drop_query = format!(
722        "DROP DATABASE IF EXISTS {}",
723        crate::utils::quote_ident(db_name)
724    );
725    target_conn
726        .execute(&drop_query, &[])
727        .await
728        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
729
730    tracing::info!("  ✓ Database '{}' dropped", db_name);
731    Ok(())
732}
733
734/// Initial replication from SQLite to PostgreSQL
735///
736/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
737/// 1. Validates SQLite file exists and is readable
738/// 2. Validates PostgreSQL target connection
739/// 3. Lists all tables from SQLite database
740/// 4. For each table:
741///    - Converts rows to JSONB format
742///    - Creates JSONB table in PostgreSQL
743///    - Batch inserts all data
744///
745/// All SQLite data is stored as JSONB with metadata:
746/// - id: Original row ID (from ID column or row number)
747/// - data: Complete row as JSON object
748/// - _source_type: "sqlite"
749/// - _migrated_at: Timestamp of migration
750///
751/// # Arguments
752///
753/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
754/// * `target_url` - PostgreSQL connection string for target (Seren) database
755///
756/// # Returns
757///
758/// Returns `Ok(())` if migration completes successfully.
759///
760/// # Errors
761///
762/// This function will return an error if:
763/// - SQLite file doesn't exist or isn't readable
764/// - Cannot connect to target PostgreSQL database
765/// - Table conversion fails
766/// - Database creation or insert operations fail
767///
768/// # Examples
769///
770/// ```no_run
771/// # use anyhow::Result;
772/// # use database_replicator::commands::init::init_sqlite_to_postgres;
773/// # async fn example() -> Result<()> {
774/// init_sqlite_to_postgres(
775///     "database.db",
776///     "postgresql://user:pass@seren.example.com/targetdb"
777/// ).await?;
778/// # Ok(())
779/// # }
780/// ```
781pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
782    tracing::info!("Starting SQLite to PostgreSQL migration...");
783
784    // Step 1: Validate SQLite file
785    tracing::info!("Step 1/4: Validating SQLite database...");
786    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
787        .context("SQLite file validation failed")?;
788    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
789
790    // Step 2: Open SQLite connection (read-only)
791    tracing::info!("Step 2/4: Opening SQLite database...");
792    let sqlite_conn =
793        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
794    tracing::info!("  ✓ SQLite database opened (read-only mode)");
795
796    // Step 3: List all tables
797    tracing::info!("Step 3/4: Discovering tables...");
798    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
799        .context("Failed to list tables from SQLite database")?;
800
801    if tables.is_empty() {
802        tracing::warn!("⚠ No tables found in SQLite database");
803        tracing::info!("✅ Migration complete (no tables to migrate)");
804        return Ok(());
805    }
806
807    tracing::info!("Found {} table(s) to migrate", tables.len());
808
809    // Connect to PostgreSQL target
810    let target_client = postgres::connect_with_retry(target_url).await?;
811    tracing::info!("  ✓ Connected to PostgreSQL target");
812
813    // Step 4: Migrate each table
814    tracing::info!("Step 4/4: Migrating tables...");
815    for (idx, table_name) in tables.iter().enumerate() {
816        tracing::info!(
817            "Migrating table {}/{}: '{}'",
818            idx + 1,
819            tables.len(),
820            table_name
821        );
822
823        // Convert SQLite table to JSONB
824        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
825            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
826
827        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
828
829        // Create JSONB table in PostgreSQL
830        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
831            .await
832            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
833
834        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
835
836        if !rows.is_empty() {
837            // Batch insert all rows
838            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
839                .await
840                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
841
842            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
843        } else {
844            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
845        }
846    }
847
848    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
849    tracing::info!(
850        "   Migrated {} table(s) from '{}' to PostgreSQL",
851        tables.len(),
852        sqlite_path
853    );
854
855    Ok(())
856}
857
858/// Initial replication from MongoDB to PostgreSQL
859///
860/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
861/// 1. Validates MongoDB connection string
862/// 2. Connects to MongoDB and verifies connection
863/// 3. Extracts database name from connection string
864/// 4. Lists all collections from MongoDB database
865/// 5. For each collection:
866///    - Converts documents to JSONB format
867///    - Creates JSONB table in PostgreSQL
868///    - Batch inserts all data
869///
870/// All MongoDB data is stored as JSONB with metadata:
871/// - id: Original _id field (ObjectId → hex, String/Int → string)
872/// - data: Complete document as JSON object
873/// - _source_type: "mongodb"
874/// - _migrated_at: Timestamp of migration
875///
876/// # Arguments
877///
878/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
879/// * `target_url` - PostgreSQL connection string for target (Seren) database
880///
881/// # Returns
882///
883/// Returns `Ok(())` if migration completes successfully.
884///
885/// # Errors
886///
887/// This function will return an error if:
888/// - MongoDB connection string is invalid
889/// - Cannot connect to MongoDB database
890/// - Database name is not specified in connection string
891/// - Cannot connect to target PostgreSQL database
892/// - Collection conversion fails
893/// - Database creation or insert operations fail
894///
895/// # Examples
896///
897/// ```no_run
898/// # use anyhow::Result;
899/// # use database_replicator::commands::init::init_mongodb_to_postgres;
900/// # async fn example() -> Result<()> {
901/// init_mongodb_to_postgres(
902///     "mongodb://localhost:27017/mydb",
903///     "postgresql://user:pass@seren.example.com/targetdb"
904/// ).await?;
905/// # Ok(())
906/// # }
907/// ```
908pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
909    tracing::info!("Starting MongoDB to PostgreSQL migration...");
910
911    // Step 1: Validate and connect to MongoDB
912    tracing::info!("Step 1/5: Validating MongoDB connection...");
913    let client = crate::mongodb::connect_mongodb(mongo_url)
914        .await
915        .context("MongoDB connection failed")?;
916    tracing::info!("  ✓ MongoDB connection validated");
917
918    // Step 2: Extract database name
919    tracing::info!("Step 2/5: Extracting database name...");
920    let db_name = crate::mongodb::extract_database_name(mongo_url)
921        .await
922        .context("Failed to parse MongoDB connection string")?
923        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
924    tracing::info!("  ✓ Database name: '{}'", db_name);
925
926    // Step 3: List all collections
927    tracing::info!("Step 3/5: Discovering collections...");
928    let db = client.database(&db_name);
929    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
930        .await
931        .context("Failed to list collections from MongoDB database")?;
932
933    if collections.is_empty() {
934        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
935        tracing::info!("✅ Migration complete (no collections to migrate)");
936        return Ok(());
937    }
938
939    tracing::info!("Found {} collection(s) to migrate", collections.len());
940
941    // Step 4: Connect to PostgreSQL target
942    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
943    let target_client = postgres::connect_with_retry(target_url).await?;
944    tracing::info!("  ✓ Connected to PostgreSQL target");
945
946    // Step 5: Migrate each collection
947    tracing::info!("Step 5/5: Migrating collections...");
948    for (idx, collection_name) in collections.iter().enumerate() {
949        tracing::info!(
950            "Migrating collection {}/{}: '{}'",
951            idx + 1,
952            collections.len(),
953            collection_name
954        );
955
956        // Convert MongoDB collection to JSONB
957        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
958            .await
959            .with_context(|| {
960                format!(
961                    "Failed to convert collection '{}' to JSONB",
962                    collection_name
963                )
964            })?;
965
966        tracing::info!(
967            "  ✓ Converted {} documents from '{}'",
968            rows.len(),
969            collection_name
970        );
971
972        // Create JSONB table in PostgreSQL
973        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
974            .await
975            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
976
977        tracing::info!(
978            "  ✓ Created JSONB table '{}' in PostgreSQL",
979            collection_name
980        );
981
982        if !rows.is_empty() {
983            // Batch insert all rows
984            crate::jsonb::writer::insert_jsonb_batch(
985                &target_client,
986                collection_name,
987                rows,
988                "mongodb",
989            )
990            .await
991            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
992
993            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
994        } else {
995            tracing::info!(
996                "  ✓ Collection '{}' is empty (no documents to insert)",
997                collection_name
998            );
999        }
1000    }
1001
1002    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1003    tracing::info!(
1004        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
1005        collections.len(),
1006        db_name
1007    );
1008
1009    Ok(())
1010}
1011
1012/// Initial replication from MySQL to PostgreSQL
1013///
1014/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
1015/// 1. Validates MySQL connection string
1016/// 2. Connects to MySQL and verifies connection
1017/// 3. Extracts database name from connection string
1018/// 4. Lists all tables from MySQL database
1019/// 5. For each table:
1020///    - Converts rows to JSONB format
1021///    - Creates JSONB table in PostgreSQL
1022///    - Batch inserts all data
1023///
1024/// All MySQL data is stored as JSONB with metadata:
1025/// - id: Primary key or auto-generated ID
1026/// - data: Complete row as JSON object
1027/// - _source_type: "mysql"
1028/// - _migrated_at: Timestamp of replication
1029///
1030/// # Arguments
1031///
1032/// * `mysql_url` - MySQL connection string (mysql://...)
1033/// * `target_url` - PostgreSQL connection string for target (Seren) database
1034///
1035/// # Returns
1036///
1037/// Returns `Ok(())` if replication completes successfully.
1038///
1039/// # Errors
1040///
1041/// This function will return an error if:
1042/// - MySQL connection string is invalid
1043/// - Cannot connect to MySQL database
1044/// - Database name is not specified in connection string
1045/// - Cannot connect to target PostgreSQL database
1046/// - Table conversion fails
1047/// - Database creation or insert operations fail
1048///
1049/// # Examples
1050///
1051/// ```no_run
1052/// # use anyhow::Result;
1053/// # use database_replicator::commands::init::init_mysql_to_postgres;
1054/// # async fn example() -> Result<()> {
1055/// init_mysql_to_postgres(
1056///     "mysql://user:pass@localhost:3306/mydb",
1057///     "postgresql://user:pass@seren.example.com/targetdb"
1058/// ).await?;
1059/// # Ok(())
1060/// # }
1061/// ```
1062pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1063    tracing::info!("Starting MySQL to PostgreSQL replication...");
1064
1065    // Step 1: Validate and connect to MySQL
1066    tracing::info!("Step 1/5: Validating MySQL connection...");
1067    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1068        .await
1069        .context("MySQL connection failed")?;
1070    tracing::info!("  ✓ MySQL connection validated");
1071
1072    // Step 2: Extract database name
1073    tracing::info!("Step 2/5: Extracting database name...");
1074    let db_name = crate::mysql::extract_database_name(mysql_url)
1075        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1076    tracing::info!("  ✓ Database name: '{}'", db_name);
1077
1078    // Step 3: List all tables
1079    tracing::info!("Step 3/5: Discovering tables...");
1080    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1081        .await
1082        .context("Failed to list tables from MySQL database")?;
1083
1084    if tables.is_empty() {
1085        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1086        tracing::info!("✅ Replication complete (no tables to replicate)");
1087        return Ok(());
1088    }
1089
1090    tracing::info!("Found {} table(s) to replicate", tables.len());
1091
1092    // Step 4: Connect to PostgreSQL target
1093    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1094    let target_client = postgres::connect_with_retry(target_url).await?;
1095    tracing::info!("  ✓ Connected to PostgreSQL target");
1096
1097    // Step 5: Replicate each table
1098    tracing::info!("Step 5/5: Replicating tables...");
1099    for (idx, table_name) in tables.iter().enumerate() {
1100        tracing::info!(
1101            "Replicating table {}/{}: '{}'",
1102            idx + 1,
1103            tables.len(),
1104            table_name
1105        );
1106
1107        // Convert MySQL table to JSONB
1108        let rows =
1109            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1110                .await
1111                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1112
1113        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1114
1115        // Create JSONB table in PostgreSQL
1116        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1117            .await
1118            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1119
1120        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1121
1122        if !rows.is_empty() {
1123            // Batch insert all rows
1124            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1125                .await
1126                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1127
1128            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1129        } else {
1130            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1131        }
1132    }
1133
1134    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1135    tracing::info!(
1136        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1137        tables.len(),
1138        db_name
1139    );
1140
1141    Ok(())
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use super::*;
1147
1148    #[tokio::test]
1149    #[ignore]
1150    async fn test_init_replicates_database() {
1151        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1152        let target = std::env::var("TEST_TARGET_URL").unwrap();
1153
1154        // Skip confirmation for automated tests, disable sync to keep test simple
1155        let filter = crate::filters::ReplicationFilter::empty();
1156        let result = init(&source, &target, true, filter, false, false, true, false).await;
1157        assert!(result.is_ok());
1158    }
1159
1160    #[test]
1161    fn test_replace_database_in_url() {
1162        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1163        let result = replace_database_in_url(url, "newdb").unwrap();
1164        assert_eq!(
1165            result,
1166            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1167        );
1168
1169        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1170        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1171        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1172    }
1173
1174    #[tokio::test]
1175    #[ignore]
1176    async fn test_database_is_empty() {
1177        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1178
1179        // postgres database might be empty of user tables
1180        // This test just verifies the function doesn't crash
1181        let result = database_is_empty(&url, "postgres").await;
1182        assert!(result.is_ok());
1183    }
1184}