database_replicator/commands/
init.rs

1// ABOUTME: Initial replication command for snapshot schema and data copy
2// ABOUTME: Performs full database dump and restore from source to target
3
4use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9/// Initial replication command for snapshot schema and data copy
10///
11/// Performs a full database dump and restore from source to target in steps:
12/// 1. Estimates database sizes and replication times
13/// 2. Prompts for confirmation (unless skip_confirmation is true)
14/// 3. Dumps global objects (roles, tablespaces) from source
15/// 4. Restores global objects to target
16/// 5. Discovers all user databases on source
17/// 6. Replicates each database (schema and data)
18/// 7. Optionally sets up continuous logical replication (if enable_sync is true)
19///
20/// Uses temporary directory for dump files, which is automatically cleaned up.
21///
22/// # Arguments
23///
24/// * `source_url` - PostgreSQL connection string for source (Neon) database
25/// * `target_url` - PostgreSQL connection string for target (Seren) database
26/// * `skip_confirmation` - Skip the size estimation and confirmation prompt
27/// * `filter` - Database and table filtering rules
28/// * `drop_existing` - Drop existing databases on target before copying
29/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
30/// * `allow_resume` - Resume from checkpoint if available (default: true)
31///
32/// # Returns
33///
34/// Returns `Ok(())` if replication completes successfully.
35///
36/// # Errors
37///
38/// This function will return an error if:
39/// - Cannot create temporary directory
40/// - Global objects dump/restore fails
41/// - Cannot connect to source database
42/// - Database discovery fails
43/// - Any database replication fails
44/// - User declines confirmation prompt
45/// - Logical replication setup fails (if enable_sync is true)
46///
47/// # Examples
48///
49/// ```no_run
50/// # use anyhow::Result;
51/// # use database_replicator::commands::init;
52/// # use database_replicator::filters::ReplicationFilter;
53/// # async fn example() -> Result<()> {
54/// // With confirmation prompt and automatic sync (default)
55/// init(
56///     "postgresql://user:pass@neon.tech/sourcedb",
57///     "postgresql://user:pass@seren.example.com/targetdb",
58///     false,
59///     ReplicationFilter::empty(),
60///     false,
61///     true,  // Enable continuous replication
62///     true   // Allow resume
63/// ).await?;
64///
65/// // Snapshot only (no continuous replication)
66/// init(
67///     "postgresql://user:pass@neon.tech/sourcedb",
68///     "postgresql://user:pass@seren.example.com/targetdb",
69///     true,
70///     ReplicationFilter::empty(),
71///     false,
72///     false, // Disable continuous replication
73///     true   // Allow resume
74/// ).await?;
75/// # Ok(())
76/// # }
77/// ```
78pub async fn init(
79    source_url: &str,
80    target_url: &str,
81    skip_confirmation: bool,
82    filter: crate::filters::ReplicationFilter,
83    drop_existing: bool,
84    enable_sync: bool,
85    allow_resume: bool,
86) -> Result<()> {
87    tracing::info!("Starting initial replication...");
88
89    // Detect source database type and route to appropriate implementation
90    let source_type =
91        crate::detect_source_type(source_url).context("Failed to detect source database type")?;
92
93    match source_type {
94        crate::SourceType::PostgreSQL => {
95            // PostgreSQL to PostgreSQL replication (existing logic below)
96            tracing::info!("Source type: PostgreSQL");
97        }
98        crate::SourceType::SQLite => {
99            // SQLite to PostgreSQL migration (simpler path)
100            tracing::info!("Source type: SQLite");
101
102            // SQLite migrations don't support PostgreSQL-specific features
103            if !filter.is_empty() {
104                tracing::warn!(
105                    "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
106                );
107            }
108            if drop_existing {
109                tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
110            }
111            if !enable_sync {
112                tracing::warn!(
113                    "⚠ SQLite sources don't support continuous replication (one-time migration only)"
114                );
115            }
116
117            return init_sqlite_to_postgres(source_url, target_url).await;
118        }
119        crate::SourceType::MongoDB => {
120            // MongoDB to PostgreSQL migration (simpler path)
121            tracing::info!("Source type: MongoDB");
122
123            // MongoDB migrations don't support PostgreSQL-specific features
124            if !filter.is_empty() {
125                tracing::warn!(
126                    "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
127                );
128            }
129            if drop_existing {
130                tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
131            }
132            if !enable_sync {
133                tracing::warn!(
134                    "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
135                );
136            }
137
138            return init_mongodb_to_postgres(source_url, target_url).await;
139        }
140        crate::SourceType::MySQL => {
141            // MySQL to PostgreSQL replication (simpler path)
142            tracing::info!("Source type: MySQL");
143
144            // MySQL replications don't support PostgreSQL-specific features
145            if !filter.is_empty() {
146                tracing::warn!(
147                    "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
148                );
149            }
150            if drop_existing {
151                tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
152            }
153            if !enable_sync {
154                tracing::warn!(
155                    "⚠ MySQL sources don't support continuous replication (one-time replication only)"
156                );
157            }
158
159            return init_mysql_to_postgres(source_url, target_url).await;
160        }
161    }
162
163    // CRITICAL: Ensure source and target are different to prevent data loss
164    crate::utils::validate_source_target_different(source_url, target_url)
165        .context("Source and target validation failed")?;
166    tracing::info!("✓ Verified source and target are different databases");
167
168    // Create managed temporary directory for dump files
169    // Unlike TempDir, this survives SIGKILL and is cleaned up on next startup
170    let temp_path =
171        crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
172    tracing::debug!("Using temp directory: {}", temp_path.display());
173
174    let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
175        .context("Failed to determine checkpoint location")?;
176
177    // Step 1: Dump global objects
178    tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
179    let globals_file = temp_path.join("globals.sql");
180    migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
181
182    // Step 2: Restore global objects
183    tracing::info!("Step 2/4: Restoring global objects to target...");
184    migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
185
186    // Step 3: Discover and filter databases
187    tracing::info!("Step 3/4: Discovering databases...");
188    let all_databases = {
189        // Scope the connection so it's dropped before subprocess operations
190        let source_client = postgres::connect_with_retry(source_url).await?;
191        migration::list_databases(&source_client).await?
192    }; // Connection dropped here
193
194    // Apply filtering rules
195    let databases: Vec<_> = all_databases
196        .into_iter()
197        .filter(|db| filter.should_replicate_database(&db.name))
198        .collect();
199
200    if databases.is_empty() {
201        let _ = checkpoint::remove_checkpoint(&checkpoint_path);
202        if filter.is_empty() {
203            tracing::warn!("⚠ No user databases found on source");
204            tracing::warn!("  This is unusual - the source database appears empty");
205            tracing::warn!("  Only global objects (roles, tablespaces) will be replicated");
206        } else {
207            tracing::warn!("⚠ No databases matched the filter criteria");
208            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
209        }
210        tracing::info!("✅ Initial replication complete (no databases to replicate)");
211        return Ok(());
212    }
213
214    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
215    let filter_hash = filter.fingerprint();
216    let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
217        source_url,
218        target_url,
219        filter_hash,
220        drop_existing,
221        enable_sync,
222    );
223
224    let mut checkpoint_state = if allow_resume {
225        match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
226            Some(existing) => {
227                // Try to validate the checkpoint
228                match existing.validate(&checkpoint_metadata, &database_names) {
229                    Ok(()) => {
230                        // Validation succeeded - resume from checkpoint
231                        if existing.completed_count() > 0 {
232                            tracing::info!(
233                                "Resume checkpoint found: {}/{} databases already replicated",
234                                existing.completed_count(),
235                                existing.total_databases()
236                            );
237                        } else {
238                            tracing::info!(
239                                "Resume checkpoint found but no databases marked complete yet"
240                            );
241                        }
242                        existing
243                    }
244                    Err(e) => {
245                        // Validation failed - log warning and start fresh
246                        tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
247                        tracing::warn!(
248                            "  Previous run configuration differs from current configuration"
249                        );
250                        tracing::warn!("  - Schema-only tables may have changed");
251                        tracing::warn!("  - Time filters may have changed");
252                        tracing::warn!("  - Table selection may have changed");
253                        tracing::warn!("  Error: {}", e);
254                        tracing::info!("");
255                        tracing::info!(
256                            "✓ Automatically discarding old checkpoint and starting fresh"
257                        );
258                        checkpoint::remove_checkpoint(&checkpoint_path)?;
259                        checkpoint::InitCheckpoint::new(
260                            checkpoint_metadata.clone(),
261                            &database_names,
262                        )
263                    }
264                }
265            }
266            None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
267        }
268    } else {
269        if checkpoint_path.exists() {
270            tracing::info!(
271                "--no-resume supplied: discarding previous checkpoint at {}",
272                checkpoint_path.display()
273            );
274        }
275        checkpoint::remove_checkpoint(&checkpoint_path)?;
276        checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
277    };
278
279    // Persist baseline state so crashes before first database can resume cleanly
280    checkpoint_state
281        .save(&checkpoint_path)
282        .context("Failed to persist checkpoint state")?;
283
284    tracing::info!("Found {} database(s) to replicate", databases.len());
285
286    // Estimate database sizes and get confirmation
287    if !skip_confirmation {
288        tracing::info!("Analyzing database sizes...");
289        let size_estimates = {
290            // Scope the connection so it's dropped after size estimation
291            let source_client = postgres::connect_with_retry(source_url).await?;
292            migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
293                .await?
294        }; // Connection dropped here
295
296        if !confirm_replication(&size_estimates)? {
297            bail!("Replication cancelled by user");
298        }
299    }
300
301    // Step 4: Replicate each database
302    tracing::info!("Step 4/4: Replicating databases...");
303    for (idx, db_info) in databases.iter().enumerate() {
304        let filtered_tables = filter.predicate_tables(&db_info.name);
305        if checkpoint_state.is_completed(&db_info.name) {
306            tracing::info!(
307                "Skipping database '{}' (already completed per checkpoint)",
308                db_info.name
309            );
310            continue;
311        }
312        tracing::info!(
313            "Replicating database {}/{}: '{}'",
314            idx + 1,
315            databases.len(),
316            db_info.name
317        );
318
319        // Build connection URLs for this specific database
320        let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
321        let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
322
323        // Handle database creation atomically to avoid TOCTOU race condition
324        // Scope the connection so it's dropped before dump/restore subprocess operations
325        {
326            let target_client = postgres::connect_with_retry(target_url).await?;
327
328            // Validate database name to prevent SQL injection
329            crate::utils::validate_postgres_identifier(&db_info.name)
330                .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
331
332            // Try to create database atomically (avoids TOCTOU vulnerability)
333            let create_query = format!("CREATE DATABASE \"{}\"", db_info.name);
334            match target_client.execute(&create_query, &[]).await {
335                Ok(_) => {
336                    tracing::info!("  Created database '{}'", db_info.name);
337                }
338                Err(err) => {
339                    // Check if error is "database already exists" (error code 42P04)
340                    if let Some(db_error) = err.as_db_error() {
341                        if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
342                            // Database already exists - handle based on user preferences
343                            tracing::info!(
344                                "  Database '{}' already exists on target",
345                                db_info.name
346                            );
347
348                            // Check if empty
349                            if database_is_empty(target_url, &db_info.name).await? {
350                                tracing::info!(
351                                    "  Database '{}' is empty, proceeding with restore",
352                                    db_info.name
353                                );
354                            } else {
355                                // Database exists and has data
356                                let should_drop = if drop_existing {
357                                    // Force drop with --drop-existing flag
358                                    true
359                                } else if skip_confirmation {
360                                    // Auto-confirm drop with --yes flag (non-interactive)
361                                    tracing::info!(
362                                        "  Auto-confirming drop for database '{}' (--yes flag)",
363                                        db_info.name
364                                    );
365                                    true
366                                } else {
367                                    // Interactive mode: prompt user
368                                    prompt_drop_database(&db_info.name)?
369                                };
370
371                                if should_drop {
372                                    drop_database_if_exists(&target_client, &db_info.name).await?;
373
374                                    // Recreate the database
375                                    let create_query =
376                                        format!("CREATE DATABASE \"{}\"", db_info.name);
377                                    target_client
378                                        .execute(&create_query, &[])
379                                        .await
380                                        .with_context(|| {
381                                            format!(
382                                                "Failed to create database '{}' after drop",
383                                                db_info.name
384                                            )
385                                        })?;
386                                    tracing::info!("  Created database '{}'", db_info.name);
387                                } else {
388                                    bail!("Aborted: Database '{}' already exists", db_info.name);
389                                }
390                            }
391                        } else {
392                            // Some other database error - propagate it
393                            return Err(err).with_context(|| {
394                                format!("Failed to create database '{}'", db_info.name)
395                            });
396                        }
397                    } else {
398                        // Not a database error - propagate it
399                        return Err(err).with_context(|| {
400                            format!("Failed to create database '{}'", db_info.name)
401                        });
402                    }
403                }
404            }
405        } // Connection dropped here before dump/restore operations
406
407        // Dump and restore schema
408        tracing::info!("  Dumping schema for '{}'...", db_info.name);
409        let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
410        migration::dump_schema(
411            &source_db_url,
412            &db_info.name,
413            schema_file.to_str().unwrap(),
414            &filter,
415        )
416        .await?;
417
418        tracing::info!("  Restoring schema for '{}'...", db_info.name);
419        migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
420
421        // Dump and restore data (using directory format for parallel operations)
422        tracing::info!("  Dumping data for '{}'...", db_info.name);
423        let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
424        migration::dump_data(
425            &source_db_url,
426            &db_info.name,
427            data_dir.to_str().unwrap(),
428            &filter,
429        )
430        .await?;
431
432        tracing::info!("  Restoring data for '{}'...", db_info.name);
433        migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
434
435        if !filtered_tables.is_empty() {
436            tracing::info!(
437                "  Applying filtered replication for {} table(s)...",
438                filtered_tables.len()
439            );
440            migration::filtered::copy_filtered_tables(
441                &source_db_url,
442                &target_db_url,
443                &filtered_tables,
444            )
445            .await?;
446        }
447
448        tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
449
450        checkpoint_state.mark_completed(&db_info.name);
451        checkpoint_state
452            .save(&checkpoint_path)
453            .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
454    }
455
456    // Explicitly clean up temp directory
457    // (This runs on normal completion; startup cleanup handles SIGKILL cases)
458    if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
459        tracing::warn!("Failed to clean up temp directory: {}", e);
460        // Don't fail the entire operation if cleanup fails
461    }
462
463    if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
464        tracing::warn!("Failed to remove checkpoint state: {}", err);
465    }
466
467    tracing::info!("✅ Initial replication complete");
468
469    // Check wal_level before attempting to set up sync
470    let mut should_enable_sync = enable_sync;
471    if enable_sync {
472        tracing::info!("Checking target wal_level for logical replication...");
473        let target_wal_level = {
474            // Scope the connection for quick wal_level check
475            let target_client = postgres::connect_with_retry(target_url).await?;
476            postgres::check_wal_level(&target_client).await?
477        }; // Connection dropped here
478
479        if target_wal_level != "logical" {
480            tracing::warn!("");
481            tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
482            tracing::warn!("  Continuous replication (subscriptions) cannot be set up");
483            tracing::warn!("");
484            tracing::warn!("  To fix this:");
485            tracing::warn!("    1. Edit postgresql.conf: wal_level = logical");
486            tracing::warn!("    2. Restart PostgreSQL server");
487            tracing::warn!(
488                "    3. Run: postgres-seren-replicator sync --source <url> --target <url>"
489            );
490            tracing::warn!("");
491            tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
492            should_enable_sync = false;
493        }
494    }
495
496    // Set up continuous logical replication if enabled
497    if should_enable_sync {
498        tracing::info!("");
499        tracing::info!("========================================");
500        tracing::info!("Step 5/5: Setting up continuous replication...");
501        tracing::info!("========================================");
502        tracing::info!("");
503
504        // Call sync command with the same filter
505        crate::commands::sync(
506            source_url,
507            target_url,
508            Some(filter),
509            None,
510            None,
511            None,
512            false,
513        )
514        .await
515        .context("Failed to set up continuous replication")?;
516
517        tracing::info!("");
518        tracing::info!("✅ Complete! Snapshot and continuous replication are active");
519    } else {
520        tracing::info!("");
521        tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
522        tracing::info!("  To enable it later, run:");
523        tracing::info!("    postgres-seren-replicator sync --source <url> --target <url>");
524    }
525
526    Ok(())
527}
528
529/// Replace the database name in a connection URL
530fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
531    // Parse URL to find database name
532    // Format: postgresql://user:pass@host:port/database?params
533
534    // Split by '?' to separate params
535    let parts: Vec<&str> = url.split('?').collect();
536    let base_url = parts[0];
537    let params = if parts.len() > 1 {
538        Some(parts[1])
539    } else {
540        None
541    };
542
543    // Split base by '/' to get everything before database name
544    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
545    if url_parts.len() != 2 {
546        anyhow::bail!("Invalid connection URL format");
547    }
548
549    // Reconstruct URL with new database name
550    let mut new_url = format!("{}/{}", url_parts[1], new_database);
551    if let Some(p) = params {
552        new_url = format!("{}?{}", new_url, p);
553    }
554
555    Ok(new_url)
556}
557
558/// Display database size estimates and prompt for confirmation
559///
560/// Shows a table with database names, sizes, and estimated replication times.
561/// Prompts the user to proceed with the replication.
562///
563/// # Arguments
564///
565/// * `sizes` - Vector of database size estimates
566///
567/// # Returns
568///
569/// Returns `true` if user confirms (enters 'y'), `false` otherwise.
570///
571/// # Errors
572///
573/// Returns an error if stdin/stdout operations fail.
574fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
575    use std::time::Duration;
576
577    // Calculate totals
578    let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
579    let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
580
581    // Print table header
582    println!();
583    println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
584    println!("{}", "─".repeat(50));
585
586    // Print each database
587    for size in sizes {
588        println!(
589            "{:<20} {:<12} {:<15}",
590            size.name,
591            size.size_human,
592            migration::format_duration(size.estimated_duration)
593        );
594    }
595
596    // Print totals
597    println!("{}", "─".repeat(50));
598    println!(
599        "Total: {} (estimated {})",
600        migration::format_bytes(total_bytes),
601        migration::format_duration(total_duration)
602    );
603    println!();
604
605    // Prompt for confirmation
606    print!("Proceed with replication? [y/N]: ");
607    io::stdout().flush()?;
608
609    let mut input = String::new();
610    io::stdin()
611        .read_line(&mut input)
612        .context("Failed to read user input")?;
613
614    Ok(input.trim().to_lowercase() == "y")
615}
616
617/// Checks if a database is empty (no user tables)
618async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
619    // Need to connect to the specific database to check tables
620    let db_url = replace_database_in_url(target_url, db_name)?;
621    let client = postgres::connect_with_retry(&db_url).await?;
622
623    let query = "
624        SELECT COUNT(*)
625        FROM information_schema.tables
626        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
627    ";
628
629    let row = client.query_one(query, &[]).await?;
630    let count: i64 = row.get(0);
631
632    Ok(count == 0)
633}
634
635/// Prompts user to drop existing database
636fn prompt_drop_database(db_name: &str) -> Result<bool> {
637    use std::io::{self, Write};
638
639    print!(
640        "\nWarning: Database '{}' already exists on target and contains data.\n\
641         Drop and recreate database? This will delete all existing data. [y/N]: ",
642        db_name
643    );
644    io::stdout().flush()?;
645
646    let mut input = String::new();
647    io::stdin().read_line(&mut input)?;
648
649    Ok(input.trim().eq_ignore_ascii_case("y"))
650}
651
652/// Drops a database if it exists
653async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
654    // Validate database name to prevent SQL injection
655    crate::utils::validate_postgres_identifier(db_name)
656        .with_context(|| format!("Invalid database name: '{}'", db_name))?;
657
658    tracing::info!("  Dropping existing database '{}'...", db_name);
659
660    // Terminate existing connections to the database
661    let terminate_query = "
662        SELECT pg_terminate_backend(pid)
663        FROM pg_stat_activity
664        WHERE datname = $1 AND pid <> pg_backend_pid()
665    ";
666    target_conn.execute(terminate_query, &[&db_name]).await?;
667
668    // Drop the database
669    let drop_query = format!("DROP DATABASE IF EXISTS \"{}\"", db_name);
670    target_conn
671        .execute(&drop_query, &[])
672        .await
673        .with_context(|| format!("Failed to drop database '{}'", db_name))?;
674
675    tracing::info!("  ✓ Database '{}' dropped", db_name);
676    Ok(())
677}
678
679/// Initial replication from SQLite to PostgreSQL
680///
681/// Performs one-time migration of SQLite database to PostgreSQL target using JSONB storage:
682/// 1. Validates SQLite file exists and is readable
683/// 2. Validates PostgreSQL target connection
684/// 3. Lists all tables from SQLite database
685/// 4. For each table:
686///    - Converts rows to JSONB format
687///    - Creates JSONB table in PostgreSQL
688///    - Batch inserts all data
689///
690/// All SQLite data is stored as JSONB with metadata:
691/// - id: Original row ID (from ID column or row number)
692/// - data: Complete row as JSON object
693/// - _source_type: "sqlite"
694/// - _migrated_at: Timestamp of migration
695///
696/// # Arguments
697///
698/// * `sqlite_path` - Path to SQLite database file (.db, .sqlite, or .sqlite3)
699/// * `target_url` - PostgreSQL connection string for target (Seren) database
700///
701/// # Returns
702///
703/// Returns `Ok(())` if migration completes successfully.
704///
705/// # Errors
706///
707/// This function will return an error if:
708/// - SQLite file doesn't exist or isn't readable
709/// - Cannot connect to target PostgreSQL database
710/// - Table conversion fails
711/// - Database creation or insert operations fail
712///
713/// # Examples
714///
715/// ```no_run
716/// # use anyhow::Result;
717/// # use database_replicator::commands::init::init_sqlite_to_postgres;
718/// # async fn example() -> Result<()> {
719/// init_sqlite_to_postgres(
720///     "database.db",
721///     "postgresql://user:pass@seren.example.com/targetdb"
722/// ).await?;
723/// # Ok(())
724/// # }
725/// ```
726pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
727    tracing::info!("Starting SQLite to PostgreSQL migration...");
728
729    // Step 1: Validate SQLite file
730    tracing::info!("Step 1/4: Validating SQLite database...");
731    let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
732        .context("SQLite file validation failed")?;
733    tracing::info!("  ✓ SQLite file validated: {}", canonical_path.display());
734
735    // Step 2: Open SQLite connection (read-only)
736    tracing::info!("Step 2/4: Opening SQLite database...");
737    let sqlite_conn =
738        crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
739    tracing::info!("  ✓ SQLite database opened (read-only mode)");
740
741    // Step 3: List all tables
742    tracing::info!("Step 3/4: Discovering tables...");
743    let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
744        .context("Failed to list tables from SQLite database")?;
745
746    if tables.is_empty() {
747        tracing::warn!("⚠ No tables found in SQLite database");
748        tracing::info!("✅ Migration complete (no tables to migrate)");
749        return Ok(());
750    }
751
752    tracing::info!("Found {} table(s) to migrate", tables.len());
753
754    // Connect to PostgreSQL target
755    let target_client = postgres::connect_with_retry(target_url).await?;
756    tracing::info!("  ✓ Connected to PostgreSQL target");
757
758    // Step 4: Migrate each table
759    tracing::info!("Step 4/4: Migrating tables...");
760    for (idx, table_name) in tables.iter().enumerate() {
761        tracing::info!(
762            "Migrating table {}/{}: '{}'",
763            idx + 1,
764            tables.len(),
765            table_name
766        );
767
768        // Convert SQLite table to JSONB
769        let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
770            .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
771
772        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
773
774        // Create JSONB table in PostgreSQL
775        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
776            .await
777            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
778
779        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
780
781        if !rows.is_empty() {
782            // Batch insert all rows
783            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
784                .await
785                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
786
787            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
788        } else {
789            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
790        }
791    }
792
793    tracing::info!("✅ SQLite to PostgreSQL migration complete!");
794    tracing::info!(
795        "   Migrated {} table(s) from '{}' to PostgreSQL",
796        tables.len(),
797        sqlite_path
798    );
799
800    Ok(())
801}
802
803/// Initial replication from MongoDB to PostgreSQL
804///
805/// Performs one-time migration of MongoDB database to PostgreSQL target using JSONB storage:
806/// 1. Validates MongoDB connection string
807/// 2. Connects to MongoDB and verifies connection
808/// 3. Extracts database name from connection string
809/// 4. Lists all collections from MongoDB database
810/// 5. For each collection:
811///    - Converts documents to JSONB format
812///    - Creates JSONB table in PostgreSQL
813///    - Batch inserts all data
814///
815/// All MongoDB data is stored as JSONB with metadata:
816/// - id: Original _id field (ObjectId → hex, String/Int → string)
817/// - data: Complete document as JSON object
818/// - _source_type: "mongodb"
819/// - _migrated_at: Timestamp of migration
820///
821/// # Arguments
822///
823/// * `mongo_url` - MongoDB connection string (mongodb:// or mongodb+srv://)
824/// * `target_url` - PostgreSQL connection string for target (Seren) database
825///
826/// # Returns
827///
828/// Returns `Ok(())` if migration completes successfully.
829///
830/// # Errors
831///
832/// This function will return an error if:
833/// - MongoDB connection string is invalid
834/// - Cannot connect to MongoDB database
835/// - Database name is not specified in connection string
836/// - Cannot connect to target PostgreSQL database
837/// - Collection conversion fails
838/// - Database creation or insert operations fail
839///
840/// # Examples
841///
842/// ```no_run
843/// # use anyhow::Result;
844/// # use database_replicator::commands::init::init_mongodb_to_postgres;
845/// # async fn example() -> Result<()> {
846/// init_mongodb_to_postgres(
847///     "mongodb://localhost:27017/mydb",
848///     "postgresql://user:pass@seren.example.com/targetdb"
849/// ).await?;
850/// # Ok(())
851/// # }
852/// ```
853pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
854    tracing::info!("Starting MongoDB to PostgreSQL migration...");
855
856    // Step 1: Validate and connect to MongoDB
857    tracing::info!("Step 1/5: Validating MongoDB connection...");
858    let client = crate::mongodb::connect_mongodb(mongo_url)
859        .await
860        .context("MongoDB connection failed")?;
861    tracing::info!("  ✓ MongoDB connection validated");
862
863    // Step 2: Extract database name
864    tracing::info!("Step 2/5: Extracting database name...");
865    let db_name = crate::mongodb::extract_database_name(mongo_url)
866        .await
867        .context("Failed to parse MongoDB connection string")?
868        .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
869    tracing::info!("  ✓ Database name: '{}'", db_name);
870
871    // Step 3: List all collections
872    tracing::info!("Step 3/5: Discovering collections...");
873    let db = client.database(&db_name);
874    let collections = crate::mongodb::reader::list_collections(&client, &db_name)
875        .await
876        .context("Failed to list collections from MongoDB database")?;
877
878    if collections.is_empty() {
879        tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
880        tracing::info!("✅ Migration complete (no collections to migrate)");
881        return Ok(());
882    }
883
884    tracing::info!("Found {} collection(s) to migrate", collections.len());
885
886    // Step 4: Connect to PostgreSQL target
887    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
888    let target_client = postgres::connect_with_retry(target_url).await?;
889    tracing::info!("  ✓ Connected to PostgreSQL target");
890
891    // Step 5: Migrate each collection
892    tracing::info!("Step 5/5: Migrating collections...");
893    for (idx, collection_name) in collections.iter().enumerate() {
894        tracing::info!(
895            "Migrating collection {}/{}: '{}'",
896            idx + 1,
897            collections.len(),
898            collection_name
899        );
900
901        // Convert MongoDB collection to JSONB
902        let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
903            .await
904            .with_context(|| {
905                format!(
906                    "Failed to convert collection '{}' to JSONB",
907                    collection_name
908                )
909            })?;
910
911        tracing::info!(
912            "  ✓ Converted {} documents from '{}'",
913            rows.len(),
914            collection_name
915        );
916
917        // Create JSONB table in PostgreSQL
918        crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
919            .await
920            .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
921
922        tracing::info!(
923            "  ✓ Created JSONB table '{}' in PostgreSQL",
924            collection_name
925        );
926
927        if !rows.is_empty() {
928            // Batch insert all rows
929            crate::jsonb::writer::insert_jsonb_batch(
930                &target_client,
931                collection_name,
932                rows,
933                "mongodb",
934            )
935            .await
936            .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
937
938            tracing::info!("  ✓ Inserted all documents into '{}'", collection_name);
939        } else {
940            tracing::info!(
941                "  ✓ Collection '{}' is empty (no documents to insert)",
942                collection_name
943            );
944        }
945    }
946
947    tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
948    tracing::info!(
949        "   Migrated {} collection(s) from database '{}' to PostgreSQL",
950        collections.len(),
951        db_name
952    );
953
954    Ok(())
955}
956
957/// Initial replication from MySQL to PostgreSQL
958///
959/// Performs one-time replication of MySQL database to PostgreSQL target using JSONB storage:
960/// 1. Validates MySQL connection string
961/// 2. Connects to MySQL and verifies connection
962/// 3. Extracts database name from connection string
963/// 4. Lists all tables from MySQL database
964/// 5. For each table:
965///    - Converts rows to JSONB format
966///    - Creates JSONB table in PostgreSQL
967///    - Batch inserts all data
968///
969/// All MySQL data is stored as JSONB with metadata:
970/// - id: Primary key or auto-generated ID
971/// - data: Complete row as JSON object
972/// - _source_type: "mysql"
973/// - _migrated_at: Timestamp of replication
974///
975/// # Arguments
976///
977/// * `mysql_url` - MySQL connection string (mysql://...)
978/// * `target_url` - PostgreSQL connection string for target (Seren) database
979///
980/// # Returns
981///
982/// Returns `Ok(())` if replication completes successfully.
983///
984/// # Errors
985///
986/// This function will return an error if:
987/// - MySQL connection string is invalid
988/// - Cannot connect to MySQL database
989/// - Database name is not specified in connection string
990/// - Cannot connect to target PostgreSQL database
991/// - Table conversion fails
992/// - Database creation or insert operations fail
993///
994/// # Examples
995///
996/// ```no_run
997/// # use anyhow::Result;
998/// # use database_replicator::commands::init::init_mysql_to_postgres;
999/// # async fn example() -> Result<()> {
1000/// init_mysql_to_postgres(
1001///     "mysql://user:pass@localhost:3306/mydb",
1002///     "postgresql://user:pass@seren.example.com/targetdb"
1003/// ).await?;
1004/// # Ok(())
1005/// # }
1006/// ```
1007pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1008    tracing::info!("Starting MySQL to PostgreSQL replication...");
1009
1010    // Step 1: Validate and connect to MySQL
1011    tracing::info!("Step 1/5: Validating MySQL connection...");
1012    let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1013        .await
1014        .context("MySQL connection failed")?;
1015    tracing::info!("  ✓ MySQL connection validated");
1016
1017    // Step 2: Extract database name
1018    tracing::info!("Step 2/5: Extracting database name...");
1019    let db_name = crate::mysql::extract_database_name(mysql_url)
1020        .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1021    tracing::info!("  ✓ Database name: '{}'", db_name);
1022
1023    // Step 3: List all tables
1024    tracing::info!("Step 3/5: Discovering tables...");
1025    let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1026        .await
1027        .context("Failed to list tables from MySQL database")?;
1028
1029    if tables.is_empty() {
1030        tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1031        tracing::info!("✅ Replication complete (no tables to replicate)");
1032        return Ok(());
1033    }
1034
1035    tracing::info!("Found {} table(s) to replicate", tables.len());
1036
1037    // Step 4: Connect to PostgreSQL target
1038    tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1039    let target_client = postgres::connect_with_retry(target_url).await?;
1040    tracing::info!("  ✓ Connected to PostgreSQL target");
1041
1042    // Step 5: Replicate each table
1043    tracing::info!("Step 5/5: Replicating tables...");
1044    for (idx, table_name) in tables.iter().enumerate() {
1045        tracing::info!(
1046            "Replicating table {}/{}: '{}'",
1047            idx + 1,
1048            tables.len(),
1049            table_name
1050        );
1051
1052        // Convert MySQL table to JSONB
1053        let rows =
1054            crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1055                .await
1056                .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1057
1058        tracing::info!("  ✓ Converted {} rows from '{}'", rows.len(), table_name);
1059
1060        // Create JSONB table in PostgreSQL
1061        crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1062            .await
1063            .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1064
1065        tracing::info!("  ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1066
1067        if !rows.is_empty() {
1068            // Batch insert all rows
1069            crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1070                .await
1071                .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1072
1073            tracing::info!("  ✓ Inserted all rows into '{}'", table_name);
1074        } else {
1075            tracing::info!("  ✓ Table '{}' is empty (no rows to insert)", table_name);
1076        }
1077    }
1078
1079    tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1080    tracing::info!(
1081        "   Replicated {} table(s) from database '{}' to PostgreSQL",
1082        tables.len(),
1083        db_name
1084    );
1085
1086    Ok(())
1087}
1088
1089#[cfg(test)]
1090mod tests {
1091    use super::*;
1092
1093    #[tokio::test]
1094    #[ignore]
1095    async fn test_init_replicates_database() {
1096        let source = std::env::var("TEST_SOURCE_URL").unwrap();
1097        let target = std::env::var("TEST_TARGET_URL").unwrap();
1098
1099        // Skip confirmation for automated tests, disable sync to keep test simple
1100        let filter = crate::filters::ReplicationFilter::empty();
1101        let result = init(&source, &target, true, filter, false, false, true).await;
1102        assert!(result.is_ok());
1103    }
1104
1105    #[test]
1106    fn test_replace_database_in_url() {
1107        let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1108        let result = replace_database_in_url(url, "newdb").unwrap();
1109        assert_eq!(
1110            result,
1111            "postgresql://user:pass@host:5432/newdb?sslmode=require"
1112        );
1113
1114        let url_no_params = "postgresql://user:pass@host:5432/olddb";
1115        let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1116        assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1117    }
1118
1119    #[tokio::test]
1120    #[ignore]
1121    async fn test_database_is_empty() {
1122        let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1123
1124        // postgres database might be empty of user tables
1125        // This test just verifies the function doesn't crash
1126        let result = database_is_empty(&url, "postgres").await;
1127        assert!(result.is_ok());
1128    }
1129}