1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157 }
158 if !enable_sync {
159 tracing::warn!(
160 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161 );
162 }
163
164 return init_sqlite_to_postgres(source_url, target_url).await;
165 }
166 crate::SourceType::MongoDB => {
167 tracing::info!("Source type: MongoDB");
169
170 if !filter.is_empty() {
172 tracing::warn!(
173 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174 );
175 }
176 if drop_existing {
177 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178 }
179 if !enable_sync {
180 tracing::warn!(
181 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182 );
183 }
184
185 return init_mongodb_to_postgres(source_url, target_url).await;
186 }
187 crate::SourceType::MySQL => {
188 tracing::info!("Source type: MySQL");
190
191 if !filter.is_empty() {
193 tracing::warn!(
194 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195 );
196 }
197 if drop_existing {
198 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199 }
200 if !enable_sync {
201 tracing::warn!(
202 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203 );
204 }
205
206 return init_mysql_to_postgres(source_url, target_url).await;
207 }
208 }
209
210 crate::utils::validate_source_target_different(source_url, target_url)
212 .context("Source and target validation failed")?;
213 tracing::info!("✓ Verified source and target are different databases");
214
215 let temp_path =
218 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219 tracing::debug!("Using temp directory: {}", temp_path.display());
220
221 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222 .context("Failed to determine checkpoint location")?;
223
224 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226 let globals_file = temp_path.join("globals.sql");
227 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231 .context("Failed to remove SUPERUSER from globals dump")?;
232 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233 .context("Failed to remove restricted parameter settings from globals dump")?;
234 remove_restricted_role_grants(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted role grants from globals dump")?;
236 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239 tracing::info!("Step 2/4: Restoring global objects to target...");
241 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243 tracing::info!("Step 3/4: Discovering databases...");
245 let all_databases = {
246 let source_client = postgres::connect_with_retry(source_url).await?;
248 migration::list_databases(&source_client).await?
249 }; let databases: Vec<_> = all_databases
253 .into_iter()
254 .filter(|db| filter.should_replicate_database(&db.name))
255 .collect();
256
257 if databases.is_empty() {
258 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259 if filter.is_empty() {
260 tracing::warn!("⚠ No user databases found on source");
261 tracing::warn!(" This is unusual - the source database appears empty");
262 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
263 } else {
264 tracing::warn!("⚠ No databases matched the filter criteria");
265 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
266 }
267 tracing::info!("✅ Initial replication complete (no databases to replicate)");
268 return Ok(());
269 }
270
271 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272 let filter_hash = filter.fingerprint();
273 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274 source_url,
275 target_url,
276 filter_hash,
277 drop_existing,
278 enable_sync,
279 );
280
281 let mut checkpoint_state = if allow_resume {
282 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283 Some(existing) => {
284 match existing.validate(&checkpoint_metadata, &database_names) {
286 Ok(()) => {
287 if existing.completed_count() > 0 {
289 tracing::info!(
290 "Resume checkpoint found: {}/{} databases already replicated",
291 existing.completed_count(),
292 existing.total_databases()
293 );
294 } else {
295 tracing::info!(
296 "Resume checkpoint found but no databases marked complete yet"
297 );
298 }
299 existing
300 }
301 Err(e) => {
302 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304 tracing::warn!(
305 " Previous run configuration differs from current configuration"
306 );
307 tracing::warn!(" - Schema-only tables may have changed");
308 tracing::warn!(" - Time filters may have changed");
309 tracing::warn!(" - Table selection may have changed");
310 tracing::warn!(" Error: {}", e);
311 tracing::info!("");
312 tracing::info!(
313 "✓ Automatically discarding old checkpoint and starting fresh"
314 );
315 checkpoint::remove_checkpoint(&checkpoint_path)?;
316 checkpoint::InitCheckpoint::new(
317 checkpoint_metadata.clone(),
318 &database_names,
319 )
320 }
321 }
322 }
323 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324 }
325 } else {
326 if checkpoint_path.exists() {
327 tracing::info!(
328 "--no-resume supplied: discarding previous checkpoint at {}",
329 checkpoint_path.display()
330 );
331 }
332 checkpoint::remove_checkpoint(&checkpoint_path)?;
333 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334 };
335
336 checkpoint_state
338 .save(&checkpoint_path)
339 .context("Failed to persist checkpoint state")?;
340
341 tracing::info!("Found {} database(s) to replicate", databases.len());
342
343 if !skip_confirmation {
345 tracing::info!("Analyzing database sizes...");
346 let size_estimates = {
347 let source_client = postgres::connect_with_retry(source_url).await?;
349 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350 .await?
351 }; if !confirm_replication(&size_estimates)? {
354 bail!("Replication cancelled by user");
355 }
356 }
357
358 tracing::info!("Step 4/4: Replicating databases...");
360 for (idx, db_info) in databases.iter().enumerate() {
361 let filtered_tables = filter.predicate_tables(&db_info.name);
362 if checkpoint_state.is_completed(&db_info.name) {
363 tracing::info!(
364 "Skipping database '{}' (already completed per checkpoint)",
365 db_info.name
366 );
367 continue;
368 }
369 tracing::info!(
370 "Replicating database {}/{}: '{}'",
371 idx + 1,
372 databases.len(),
373 db_info.name
374 );
375
376 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380 {
383 let target_client = postgres::connect_with_retry(target_url).await?;
384
385 crate::utils::validate_postgres_identifier(&db_info.name)
387 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389 let create_query = format!(
391 "CREATE DATABASE {}",
392 crate::utils::quote_ident(&db_info.name)
393 );
394 match target_client.execute(&create_query, &[]).await {
395 Ok(_) => {
396 tracing::info!(" Created database '{}'", db_info.name);
397 }
398 Err(err) => {
399 if let Some(db_error) = err.as_db_error() {
401 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402 tracing::info!(
404 " Database '{}' already exists on target",
405 db_info.name
406 );
407
408 if database_is_empty(&target_client).await? {
410 tracing::info!(
411 " Database '{}' is empty, proceeding with restore",
412 db_info.name
413 );
414 } else {
415 let should_drop = if drop_existing {
417 true
419 } else if skip_confirmation {
420 tracing::info!(
422 " Auto-confirming drop for database '{}' (--yes flag)",
423 db_info.name
424 );
425 true
426 } else {
427 prompt_drop_database(&db_info.name)?
429 };
430
431 if should_drop {
432 drop_database_if_exists(&target_client, &db_info.name).await?;
433
434 let create_query = format!(
436 "CREATE DATABASE {}",
437 crate::utils::quote_ident(&db_info.name)
438 );
439 target_client
440 .execute(&create_query, &[])
441 .await
442 .with_context(|| {
443 format!(
444 "Failed to create database '{}' after drop",
445 db_info.name
446 )
447 })?;
448 tracing::info!(" Created database '{}'", db_info.name);
449 } else {
450 bail!("Aborted: Database '{}' already exists", db_info.name);
451 }
452 }
453 } else {
454 return Err(err).with_context(|| {
456 format!("Failed to create database '{}'", db_info.name)
457 });
458 }
459 } else {
460 return Err(err).with_context(|| {
462 format!("Failed to create database '{}'", db_info.name)
463 });
464 }
465 }
466 }
467 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
471 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472 migration::dump_schema(
473 &source_db_url,
474 &db_info.name,
475 schema_file.to_str().unwrap(),
476 &filter,
477 )
478 .await?;
479
480 tracing::info!(" Restoring schema for '{}'...", db_info.name);
481 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483 tracing::info!(" Dumping data for '{}'...", db_info.name);
485 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486 migration::dump_data(
487 &source_db_url,
488 &db_info.name,
489 data_dir.to_str().unwrap(),
490 &filter,
491 )
492 .await?;
493
494 tracing::info!(" Restoring data for '{}'...", db_info.name);
495 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497 if !filtered_tables.is_empty() {
498 tracing::info!(
499 " Applying filtered replication for {} table(s)...",
500 filtered_tables.len()
501 );
502 migration::filtered::copy_filtered_tables(
503 &source_db_url,
504 &target_db_url,
505 &filtered_tables,
506 )
507 .await?;
508 }
509
510 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512 checkpoint_state.mark_completed(&db_info.name);
513 checkpoint_state
514 .save(&checkpoint_path)
515 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516 }
517
518 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521 tracing::warn!("Failed to clean up temp directory: {}", e);
522 }
524
525 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526 tracing::warn!("Failed to remove checkpoint state: {}", err);
527 }
528
529 tracing::info!("✅ Initial replication complete");
530
531 let mut should_enable_sync = enable_sync;
533 if enable_sync {
534 tracing::info!("Checking target wal_level for logical replication...");
535 let target_wal_level = {
536 let target_client = postgres::connect_with_retry(target_url).await?;
538 postgres::check_wal_level(&target_client).await?
539 }; if target_wal_level != "logical" {
542 tracing::warn!("");
543 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
545 tracing::warn!("");
546 tracing::warn!(" To fix this:");
547 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
548 tracing::warn!(" 2. Restart PostgreSQL server");
549 tracing::warn!(
550 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551 );
552 tracing::warn!("");
553 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554 should_enable_sync = false;
555 }
556 }
557
558 if should_enable_sync {
560 tracing::info!("");
561 tracing::info!("========================================");
562 tracing::info!("Step 5/5: Setting up continuous replication...");
563 tracing::info!("========================================");
564 tracing::info!("");
565
566 crate::commands::sync(
568 source_url,
569 target_url,
570 Some(filter),
571 None,
572 None,
573 None,
574 false,
575 )
576 .await
577 .context("Failed to set up continuous replication")?;
578
579 tracing::info!("");
580 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581 } else {
582 tracing::info!("");
583 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584 tracing::info!(" To enable it later, run:");
585 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
586 }
587
588 Ok(())
589}
590
591fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593 let parts: Vec<&str> = url.split('?').collect();
598 let base_url = parts[0];
599 let params = if parts.len() > 1 {
600 Some(parts[1])
601 } else {
602 None
603 };
604
605 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607 if url_parts.len() != 2 {
608 anyhow::bail!("Invalid connection URL format");
609 }
610
611 let mut new_url = format!("{}/{}", url_parts[1], new_database);
613 if let Some(p) = params {
614 new_url = format!("{}?{}", new_url, p);
615 }
616
617 Ok(new_url)
618}
619
620fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637 use std::time::Duration;
638
639 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643 println!();
645 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646 println!("{}", "─".repeat(50));
647
648 for size in sizes {
650 println!(
651 "{:<20} {:<12} {:<15}",
652 size.name,
653 size.size_human,
654 migration::format_duration(size.estimated_duration)
655 );
656 }
657
658 println!("{}", "─".repeat(50));
660 println!(
661 "Total: {} (estimated {})",
662 migration::format_bytes(total_bytes),
663 migration::format_duration(total_duration)
664 );
665 println!();
666
667 print!("Proceed with replication? [y/N]: ");
669 io::stdout().flush()?;
670
671 let mut input = String::new();
672 io::stdin()
673 .read_line(&mut input)
674 .context("Failed to read user input")?;
675
676 Ok(input.trim().to_lowercase() == "y")
677}
678
679async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
684 let query = "
685 SELECT COUNT(*)
686 FROM information_schema.tables
687 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
688 ";
689
690 let row = client.query_one(query, &[]).await?;
691 let count: i64 = row.get(0);
692
693 Ok(count == 0)
694}
695
696fn prompt_drop_database(db_name: &str) -> Result<bool> {
698 use std::io::{self, Write};
699
700 print!(
701 "\nWarning: Database '{}' already exists on target and contains data.\n\
702 Drop and recreate database? This will delete all existing data. [y/N]: ",
703 db_name
704 );
705 io::stdout().flush()?;
706
707 let mut input = String::new();
708 io::stdin().read_line(&mut input)?;
709
710 Ok(input.trim().eq_ignore_ascii_case("y"))
711}
712
713async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
715 crate::utils::validate_postgres_identifier(db_name)
717 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
718
719 tracing::info!(" Dropping existing database '{}'...", db_name);
720
721 let terminate_query = "
724 SELECT pg_terminate_backend(sa.pid)
725 FROM pg_stat_activity sa
726 JOIN pg_roles r ON sa.usename = r.rolname
727 WHERE sa.datname = $1
728 AND sa.pid <> pg_backend_pid()
729 AND NOT r.rolsuper
730 ";
731 target_conn.execute(terminate_query, &[&db_name]).await?;
732
733 let remaining_query = "
735 SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
736 FROM pg_stat_activity sa
737 WHERE sa.datname = $1
738 AND sa.pid <> pg_backend_pid()
739 ";
740 let row = target_conn
741 .query_one(remaining_query, &[&db_name])
742 .await
743 .context("Failed to check remaining connections")?;
744 let remaining_count: i64 = row.get(0);
745 let remaining_users: Option<String> = row.get(1);
746
747 if remaining_count > 0 {
748 let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
749 bail!(
750 "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
751 These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
752 This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
753 processes maintain superuser connections.\n\n\
754 To resolve this:\n\
755 1. Wait a few minutes and retry (system connections may be temporary)\n\
756 2. Ask your database administrator to terminate the blocking sessions:\n\
757 SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
758 3. If using AWS RDS, check for RDS-managed connections in the RDS console",
759 db_name,
760 remaining_count,
761 users,
762 db_name
763 );
764 }
765
766 let drop_query = format!(
768 "DROP DATABASE IF EXISTS {}",
769 crate::utils::quote_ident(db_name)
770 );
771 target_conn
772 .execute(&drop_query, &[])
773 .await
774 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
775
776 tracing::info!(" ✓ Database '{}' dropped", db_name);
777 Ok(())
778}
779
780pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
828 tracing::info!("Starting SQLite to PostgreSQL migration...");
829
830 tracing::info!("Step 1/4: Validating SQLite database...");
832 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
833 .context("SQLite file validation failed")?;
834 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
835
836 tracing::info!("Step 2/4: Opening SQLite database...");
838 let sqlite_conn =
839 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
840 tracing::info!(" ✓ SQLite database opened (read-only mode)");
841
842 tracing::info!("Step 3/4: Discovering tables...");
844 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
845 .context("Failed to list tables from SQLite database")?;
846
847 if tables.is_empty() {
848 tracing::warn!("⚠ No tables found in SQLite database");
849 tracing::info!("✅ Migration complete (no tables to migrate)");
850 return Ok(());
851 }
852
853 tracing::info!("Found {} table(s) to migrate", tables.len());
854
855 let target_client = postgres::connect_with_retry(target_url).await?;
857 tracing::info!(" ✓ Connected to PostgreSQL target");
858
859 tracing::info!("Step 4/4: Migrating tables...");
861 for (idx, table_name) in tables.iter().enumerate() {
862 tracing::info!(
863 "Migrating table {}/{}: '{}'",
864 idx + 1,
865 tables.len(),
866 table_name
867 );
868
869 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
871 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
872
873 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
874
875 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
877 .await
878 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
879
880 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
881
882 if !rows.is_empty() {
883 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
885 .await
886 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
887
888 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
889 } else {
890 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
891 }
892 }
893
894 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
895 tracing::info!(
896 " Migrated {} table(s) from '{}' to PostgreSQL",
897 tables.len(),
898 sqlite_path
899 );
900
901 Ok(())
902}
903
904pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
955 tracing::info!("Starting MongoDB to PostgreSQL migration...");
956
957 tracing::info!("Step 1/5: Validating MongoDB connection...");
959 let client = crate::mongodb::connect_mongodb(mongo_url)
960 .await
961 .context("MongoDB connection failed")?;
962 tracing::info!(" ✓ MongoDB connection validated");
963
964 tracing::info!("Step 2/5: Extracting database name...");
966 let db_name = crate::mongodb::extract_database_name(mongo_url)
967 .await
968 .context("Failed to parse MongoDB connection string")?
969 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
970 tracing::info!(" ✓ Database name: '{}'", db_name);
971
972 tracing::info!("Step 3/5: Discovering collections...");
974 let db = client.database(&db_name);
975 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
976 .await
977 .context("Failed to list collections from MongoDB database")?;
978
979 if collections.is_empty() {
980 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
981 tracing::info!("✅ Migration complete (no collections to migrate)");
982 return Ok(());
983 }
984
985 tracing::info!("Found {} collection(s) to migrate", collections.len());
986
987 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
989 let target_client = postgres::connect_with_retry(target_url).await?;
990 tracing::info!(" ✓ Connected to PostgreSQL target");
991
992 tracing::info!("Step 5/5: Migrating collections...");
994 for (idx, collection_name) in collections.iter().enumerate() {
995 tracing::info!(
996 "Migrating collection {}/{}: '{}'",
997 idx + 1,
998 collections.len(),
999 collection_name
1000 );
1001
1002 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1004 .await
1005 .with_context(|| {
1006 format!(
1007 "Failed to convert collection '{}' to JSONB",
1008 collection_name
1009 )
1010 })?;
1011
1012 tracing::info!(
1013 " ✓ Converted {} documents from '{}'",
1014 rows.len(),
1015 collection_name
1016 );
1017
1018 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1020 .await
1021 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1022
1023 tracing::info!(
1024 " ✓ Created JSONB table '{}' in PostgreSQL",
1025 collection_name
1026 );
1027
1028 if !rows.is_empty() {
1029 crate::jsonb::writer::insert_jsonb_batch(
1031 &target_client,
1032 collection_name,
1033 rows,
1034 "mongodb",
1035 )
1036 .await
1037 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1038
1039 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1040 } else {
1041 tracing::info!(
1042 " ✓ Collection '{}' is empty (no documents to insert)",
1043 collection_name
1044 );
1045 }
1046 }
1047
1048 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1049 tracing::info!(
1050 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1051 collections.len(),
1052 db_name
1053 );
1054
1055 Ok(())
1056}
1057
1058pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1109 tracing::info!("Starting MySQL to PostgreSQL replication...");
1110
1111 tracing::info!("Step 1/5: Validating MySQL connection...");
1113 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1114 .await
1115 .context("MySQL connection failed")?;
1116 tracing::info!(" ✓ MySQL connection validated");
1117
1118 tracing::info!("Step 2/5: Extracting database name...");
1120 let db_name = crate::mysql::extract_database_name(mysql_url)
1121 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1122 tracing::info!(" ✓ Database name: '{}'", db_name);
1123
1124 tracing::info!("Step 3/5: Discovering tables...");
1126 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1127 .await
1128 .context("Failed to list tables from MySQL database")?;
1129
1130 if tables.is_empty() {
1131 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1132 tracing::info!("✅ Replication complete (no tables to replicate)");
1133 return Ok(());
1134 }
1135
1136 tracing::info!("Found {} table(s) to replicate", tables.len());
1137
1138 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1140 let target_client = postgres::connect_with_retry(target_url).await?;
1141 tracing::info!(" ✓ Connected to PostgreSQL target");
1142
1143 tracing::info!("Step 5/5: Replicating tables...");
1145 for (idx, table_name) in tables.iter().enumerate() {
1146 tracing::info!(
1147 "Replicating table {}/{}: '{}'",
1148 idx + 1,
1149 tables.len(),
1150 table_name
1151 );
1152
1153 let rows =
1155 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1156 .await
1157 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1158
1159 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1160
1161 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1163 .await
1164 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1165
1166 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1167
1168 if !rows.is_empty() {
1169 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1171 .await
1172 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1173
1174 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1175 } else {
1176 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1177 }
1178 }
1179
1180 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1181 tracing::info!(
1182 " Replicated {} table(s) from database '{}' to PostgreSQL",
1183 tables.len(),
1184 db_name
1185 );
1186
1187 Ok(())
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192 use super::*;
1193
1194 #[tokio::test]
1195 #[ignore]
1196 async fn test_init_replicates_database() {
1197 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1198 let target = std::env::var("TEST_TARGET_URL").unwrap();
1199
1200 let filter = crate::filters::ReplicationFilter::empty();
1202 let result = init(&source, &target, true, filter, false, false, true, false).await;
1203 assert!(result.is_ok());
1204 }
1205
1206 #[test]
1207 fn test_replace_database_in_url() {
1208 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1209 let result = replace_database_in_url(url, "newdb").unwrap();
1210 assert_eq!(
1211 result,
1212 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1213 );
1214
1215 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1216 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1217 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1218 }
1219
1220 #[tokio::test]
1221 #[ignore]
1222 async fn test_database_is_empty() {
1223 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1224
1225 let client = crate::postgres::connect_with_retry(&url)
1227 .await
1228 .expect("Failed to connect");
1229
1230 let result = database_is_empty(&client).await;
1233 assert!(result.is_ok());
1234 }
1235}