1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157 }
158 if !enable_sync {
159 tracing::warn!(
160 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161 );
162 }
163
164 return init_sqlite_to_postgres(source_url, target_url).await;
165 }
166 crate::SourceType::MongoDB => {
167 tracing::info!("Source type: MongoDB");
169
170 if !filter.is_empty() {
172 tracing::warn!(
173 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174 );
175 }
176 if drop_existing {
177 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178 }
179 if !enable_sync {
180 tracing::warn!(
181 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182 );
183 }
184
185 return init_mongodb_to_postgres(source_url, target_url).await;
186 }
187 crate::SourceType::MySQL => {
188 tracing::info!("Source type: MySQL");
190
191 if !filter.is_empty() {
193 tracing::warn!(
194 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195 );
196 }
197 if drop_existing {
198 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199 }
200 if !enable_sync {
201 tracing::warn!(
202 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203 );
204 }
205
206 return init_mysql_to_postgres(source_url, target_url).await;
207 }
208 }
209
210 crate::utils::validate_source_target_different(source_url, target_url)
212 .context("Source and target validation failed")?;
213 tracing::info!("✓ Verified source and target are different databases");
214
215 let temp_path =
218 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219 tracing::debug!("Using temp directory: {}", temp_path.display());
220
221 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222 .context("Failed to determine checkpoint location")?;
223
224 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226 let globals_file = temp_path.join("globals.sql");
227 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231 .context("Failed to remove SUPERUSER from globals dump")?;
232 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233 .context("Failed to remove restricted parameter settings from globals dump")?;
234 remove_restricted_role_grants(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted role grants from globals dump")?;
236 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239 tracing::info!("Step 2/4: Restoring global objects to target...");
241 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243 tracing::info!("Step 3/4: Discovering databases...");
245 let all_databases = {
246 let source_client = postgres::connect_with_retry(source_url).await?;
248 migration::list_databases(&source_client).await?
249 }; let databases: Vec<_> = all_databases
253 .into_iter()
254 .filter(|db| filter.should_replicate_database(&db.name))
255 .collect();
256
257 if databases.is_empty() {
258 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259 if filter.is_empty() {
260 tracing::warn!("⚠ No user databases found on source");
261 tracing::warn!(" This is unusual - the source database appears empty");
262 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
263 } else {
264 tracing::warn!("⚠ No databases matched the filter criteria");
265 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
266 }
267 tracing::info!("✅ Initial replication complete (no databases to replicate)");
268 return Ok(());
269 }
270
271 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272 let filter_hash = filter.fingerprint();
273 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274 source_url,
275 target_url,
276 filter_hash,
277 drop_existing,
278 enable_sync,
279 );
280
281 let mut checkpoint_state = if allow_resume {
282 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283 Some(existing) => {
284 match existing.validate(&checkpoint_metadata, &database_names) {
286 Ok(()) => {
287 if existing.completed_count() > 0 {
289 tracing::info!(
290 "Resume checkpoint found: {}/{} databases already replicated",
291 existing.completed_count(),
292 existing.total_databases()
293 );
294 } else {
295 tracing::info!(
296 "Resume checkpoint found but no databases marked complete yet"
297 );
298 }
299 existing
300 }
301 Err(e) => {
302 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304 tracing::warn!(
305 " Previous run configuration differs from current configuration"
306 );
307 tracing::warn!(" - Schema-only tables may have changed");
308 tracing::warn!(" - Time filters may have changed");
309 tracing::warn!(" - Table selection may have changed");
310 tracing::warn!(" Error: {}", e);
311 tracing::info!("");
312 tracing::info!(
313 "✓ Automatically discarding old checkpoint and starting fresh"
314 );
315 checkpoint::remove_checkpoint(&checkpoint_path)?;
316 checkpoint::InitCheckpoint::new(
317 checkpoint_metadata.clone(),
318 &database_names,
319 )
320 }
321 }
322 }
323 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324 }
325 } else {
326 if checkpoint_path.exists() {
327 tracing::info!(
328 "--no-resume supplied: discarding previous checkpoint at {}",
329 checkpoint_path.display()
330 );
331 }
332 checkpoint::remove_checkpoint(&checkpoint_path)?;
333 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334 };
335
336 checkpoint_state
338 .save(&checkpoint_path)
339 .context("Failed to persist checkpoint state")?;
340
341 tracing::info!("Found {} database(s) to replicate", databases.len());
342
343 if !skip_confirmation {
345 tracing::info!("Analyzing database sizes...");
346 let size_estimates = {
347 let source_client = postgres::connect_with_retry(source_url).await?;
349 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350 .await?
351 }; if !confirm_replication(&size_estimates)? {
354 bail!("Replication cancelled by user");
355 }
356 }
357
358 tracing::info!("Step 4/4: Replicating databases...");
360 for (idx, db_info) in databases.iter().enumerate() {
361 let filtered_tables = filter.predicate_tables(&db_info.name);
362 if checkpoint_state.is_completed(&db_info.name) {
363 tracing::info!(
364 "Skipping database '{}' (already completed per checkpoint)",
365 db_info.name
366 );
367 continue;
368 }
369 tracing::info!(
370 "Replicating database {}/{}: '{}'",
371 idx + 1,
372 databases.len(),
373 db_info.name
374 );
375
376 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380 {
383 let target_client = postgres::connect_with_retry(target_url).await?;
384
385 crate::utils::validate_postgres_identifier(&db_info.name)
387 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389 let create_query = format!(
391 "CREATE DATABASE {}",
392 crate::utils::quote_ident(&db_info.name)
393 );
394 match target_client.execute(&create_query, &[]).await {
395 Ok(_) => {
396 tracing::info!(" Created database '{}'", db_info.name);
397 }
398 Err(err) => {
399 if let Some(db_error) = err.as_db_error() {
401 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402 tracing::info!(
404 " Database '{}' already exists on target",
405 db_info.name
406 );
407
408 let is_empty = {
411 let db_client =
412 postgres::connect_with_retry(&target_db_url).await?;
413 database_is_empty(&db_client).await?
414 }; if is_empty {
417 tracing::info!(
418 " Database '{}' is empty, proceeding with restore",
419 db_info.name
420 );
421 } else {
422 let should_drop = if drop_existing {
424 true
426 } else if skip_confirmation {
427 tracing::info!(
429 " Auto-confirming drop for database '{}' (--yes flag)",
430 db_info.name
431 );
432 true
433 } else {
434 prompt_drop_database(&db_info.name)?
436 };
437
438 if should_drop {
439 drop_database_if_exists(&target_client, &db_info.name).await?;
440
441 let create_query = format!(
443 "CREATE DATABASE {}",
444 crate::utils::quote_ident(&db_info.name)
445 );
446 target_client
447 .execute(&create_query, &[])
448 .await
449 .with_context(|| {
450 format!(
451 "Failed to create database '{}' after drop",
452 db_info.name
453 )
454 })?;
455 tracing::info!(" Created database '{}'", db_info.name);
456 } else {
457 bail!("Aborted: Database '{}' already exists", db_info.name);
458 }
459 }
460 } else {
461 return Err(err).with_context(|| {
463 format!("Failed to create database '{}'", db_info.name)
464 });
465 }
466 } else {
467 return Err(err).with_context(|| {
469 format!("Failed to create database '{}'", db_info.name)
470 });
471 }
472 }
473 }
474 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
478 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
479 migration::dump_schema(
480 &source_db_url,
481 &db_info.name,
482 schema_file.to_str().unwrap(),
483 &filter,
484 )
485 .await?;
486
487 tracing::info!(" Restoring schema for '{}'...", db_info.name);
488 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
489
490 tracing::info!(" Dumping data for '{}'...", db_info.name);
492 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
493 migration::dump_data(
494 &source_db_url,
495 &db_info.name,
496 data_dir.to_str().unwrap(),
497 &filter,
498 )
499 .await?;
500
501 tracing::info!(" Restoring data for '{}'...", db_info.name);
502 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
503
504 if !filtered_tables.is_empty() {
505 tracing::info!(
506 " Applying filtered replication for {} table(s)...",
507 filtered_tables.len()
508 );
509 migration::filtered::copy_filtered_tables(
510 &source_db_url,
511 &target_db_url,
512 &filtered_tables,
513 )
514 .await?;
515 }
516
517 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
518
519 checkpoint_state.mark_completed(&db_info.name);
520 checkpoint_state
521 .save(&checkpoint_path)
522 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
523 }
524
525 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
528 tracing::warn!("Failed to clean up temp directory: {}", e);
529 }
531
532 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
533 tracing::warn!("Failed to remove checkpoint state: {}", err);
534 }
535
536 tracing::info!("✅ Initial replication complete");
537
538 let mut should_enable_sync = enable_sync;
540 if enable_sync {
541 tracing::info!("Checking target wal_level for logical replication...");
542 let target_wal_level = {
543 let target_client = postgres::connect_with_retry(target_url).await?;
545 postgres::check_wal_level(&target_client).await?
546 }; if target_wal_level != "logical" {
549 tracing::warn!("");
550 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
551 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
552 tracing::warn!("");
553 tracing::warn!(" To fix this:");
554 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
555 tracing::warn!(" 2. Restart PostgreSQL server");
556 tracing::warn!(
557 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
558 );
559 tracing::warn!("");
560 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
561 should_enable_sync = false;
562 }
563 }
564
565 if should_enable_sync {
567 tracing::info!("");
568 tracing::info!("========================================");
569 tracing::info!("Step 5/5: Setting up continuous replication...");
570 tracing::info!("========================================");
571 tracing::info!("");
572
573 crate::commands::sync(
575 source_url,
576 target_url,
577 Some(filter),
578 None,
579 None,
580 None,
581 false,
582 )
583 .await
584 .context("Failed to set up continuous replication")?;
585
586 tracing::info!("");
587 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
588 } else {
589 tracing::info!("");
590 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
591 tracing::info!(" To enable it later, run:");
592 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
593 }
594
595 Ok(())
596}
597
598fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
600 let parts: Vec<&str> = url.split('?').collect();
605 let base_url = parts[0];
606 let params = if parts.len() > 1 {
607 Some(parts[1])
608 } else {
609 None
610 };
611
612 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
614 if url_parts.len() != 2 {
615 anyhow::bail!("Invalid connection URL format");
616 }
617
618 let mut new_url = format!("{}/{}", url_parts[1], new_database);
620 if let Some(p) = params {
621 new_url = format!("{}?{}", new_url, p);
622 }
623
624 Ok(new_url)
625}
626
627fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
644 use std::time::Duration;
645
646 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
648 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
649
650 println!();
652 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
653 println!("{}", "─".repeat(50));
654
655 for size in sizes {
657 println!(
658 "{:<20} {:<12} {:<15}",
659 size.name,
660 size.size_human,
661 migration::format_duration(size.estimated_duration)
662 );
663 }
664
665 println!("{}", "─".repeat(50));
667 println!(
668 "Total: {} (estimated {})",
669 migration::format_bytes(total_bytes),
670 migration::format_duration(total_duration)
671 );
672 println!();
673
674 print!("Proceed with replication? [y/N]: ");
676 io::stdout().flush()?;
677
678 let mut input = String::new();
679 io::stdin()
680 .read_line(&mut input)
681 .context("Failed to read user input")?;
682
683 Ok(input.trim().to_lowercase() == "y")
684}
685
686async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
690 let query = "
691 SELECT COUNT(*)
692 FROM information_schema.tables
693 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
694 ";
695
696 let row = tokio::time::timeout(
698 std::time::Duration::from_secs(30),
699 client.query_one(query, &[]),
700 )
701 .await
702 .context("database_is_empty query timed out after 30 seconds")?
703 .context("Failed to query information_schema.tables")?;
704
705 let count: i64 = row.get(0);
706
707 Ok(count == 0)
708}
709
710fn prompt_drop_database(db_name: &str) -> Result<bool> {
712 use std::io::{self, Write};
713
714 print!(
715 "\nWarning: Database '{}' already exists on target and contains data.\n\
716 Drop and recreate database? This will delete all existing data. [y/N]: ",
717 db_name
718 );
719 io::stdout().flush()?;
720
721 let mut input = String::new();
722 io::stdin().read_line(&mut input)?;
723
724 Ok(input.trim().eq_ignore_ascii_case("y"))
725}
726
727async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
729 crate::utils::validate_postgres_identifier(db_name)
731 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
732
733 tracing::info!(" Dropping existing database '{}'...", db_name);
734
735 let terminate_query = "
738 SELECT pg_terminate_backend(sa.pid)
739 FROM pg_stat_activity sa
740 JOIN pg_roles r ON sa.usename = r.rolname
741 WHERE sa.datname = $1
742 AND sa.pid <> pg_backend_pid()
743 AND NOT r.rolsuper
744 ";
745 target_conn.execute(terminate_query, &[&db_name]).await?;
746
747 let remaining_query = "
749 SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
750 FROM pg_stat_activity sa
751 WHERE sa.datname = $1
752 AND sa.pid <> pg_backend_pid()
753 ";
754 let row = target_conn
755 .query_one(remaining_query, &[&db_name])
756 .await
757 .context("Failed to check remaining connections")?;
758 let remaining_count: i64 = row.get(0);
759 let remaining_users: Option<String> = row.get(1);
760
761 if remaining_count > 0 {
762 let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
763 bail!(
764 "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
765 These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
766 This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
767 processes maintain superuser connections.\n\n\
768 To resolve this:\n\
769 1. Wait a few minutes and retry (system connections may be temporary)\n\
770 2. Ask your database administrator to terminate the blocking sessions:\n\
771 SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
772 3. If using AWS RDS, check for RDS-managed connections in the RDS console",
773 db_name,
774 remaining_count,
775 users,
776 db_name
777 );
778 }
779
780 let drop_query = format!(
782 "DROP DATABASE IF EXISTS {}",
783 crate::utils::quote_ident(db_name)
784 );
785 target_conn
786 .execute(&drop_query, &[])
787 .await
788 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
789
790 tracing::info!(" ✓ Database '{}' dropped", db_name);
791 Ok(())
792}
793
794pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
842 tracing::info!("Starting SQLite to PostgreSQL migration...");
843
844 tracing::info!("Step 1/4: Validating SQLite database...");
846 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
847 .context("SQLite file validation failed")?;
848 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
849
850 tracing::info!("Step 2/4: Opening SQLite database...");
852 let sqlite_conn =
853 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
854 tracing::info!(" ✓ SQLite database opened (read-only mode)");
855
856 tracing::info!("Step 3/4: Discovering tables...");
858 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
859 .context("Failed to list tables from SQLite database")?;
860
861 if tables.is_empty() {
862 tracing::warn!("⚠ No tables found in SQLite database");
863 tracing::info!("✅ Migration complete (no tables to migrate)");
864 return Ok(());
865 }
866
867 tracing::info!("Found {} table(s) to migrate", tables.len());
868
869 let target_client = postgres::connect_with_retry(target_url).await?;
871 tracing::info!(" ✓ Connected to PostgreSQL target");
872
873 tracing::info!("Step 4/4: Migrating tables...");
875 for (idx, table_name) in tables.iter().enumerate() {
876 tracing::info!(
877 "Migrating table {}/{}: '{}'",
878 idx + 1,
879 tables.len(),
880 table_name
881 );
882
883 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
885 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
886
887 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
888
889 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
891 .await
892 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
893
894 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
895
896 if !rows.is_empty() {
897 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
899 .await
900 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
901
902 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
903 } else {
904 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
905 }
906 }
907
908 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
909 tracing::info!(
910 " Migrated {} table(s) from '{}' to PostgreSQL",
911 tables.len(),
912 sqlite_path
913 );
914
915 Ok(())
916}
917
918pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
969 tracing::info!("Starting MongoDB to PostgreSQL migration...");
970
971 tracing::info!("Step 1/5: Validating MongoDB connection...");
973 let client = crate::mongodb::connect_mongodb(mongo_url)
974 .await
975 .context("MongoDB connection failed")?;
976 tracing::info!(" ✓ MongoDB connection validated");
977
978 tracing::info!("Step 2/5: Extracting database name...");
980 let db_name = crate::mongodb::extract_database_name(mongo_url)
981 .await
982 .context("Failed to parse MongoDB connection string")?
983 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
984 tracing::info!(" ✓ Database name: '{}'", db_name);
985
986 tracing::info!("Step 3/5: Discovering collections...");
988 let db = client.database(&db_name);
989 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
990 .await
991 .context("Failed to list collections from MongoDB database")?;
992
993 if collections.is_empty() {
994 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
995 tracing::info!("✅ Migration complete (no collections to migrate)");
996 return Ok(());
997 }
998
999 tracing::info!("Found {} collection(s) to migrate", collections.len());
1000
1001 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1003 let target_client = postgres::connect_with_retry(target_url).await?;
1004 tracing::info!(" ✓ Connected to PostgreSQL target");
1005
1006 tracing::info!("Step 5/5: Migrating collections...");
1008 for (idx, collection_name) in collections.iter().enumerate() {
1009 tracing::info!(
1010 "Migrating collection {}/{}: '{}'",
1011 idx + 1,
1012 collections.len(),
1013 collection_name
1014 );
1015
1016 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1018 .await
1019 .with_context(|| {
1020 format!(
1021 "Failed to convert collection '{}' to JSONB",
1022 collection_name
1023 )
1024 })?;
1025
1026 tracing::info!(
1027 " ✓ Converted {} documents from '{}'",
1028 rows.len(),
1029 collection_name
1030 );
1031
1032 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1034 .await
1035 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1036
1037 tracing::info!(
1038 " ✓ Created JSONB table '{}' in PostgreSQL",
1039 collection_name
1040 );
1041
1042 if !rows.is_empty() {
1043 crate::jsonb::writer::insert_jsonb_batch(
1045 &target_client,
1046 collection_name,
1047 rows,
1048 "mongodb",
1049 )
1050 .await
1051 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1052
1053 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1054 } else {
1055 tracing::info!(
1056 " ✓ Collection '{}' is empty (no documents to insert)",
1057 collection_name
1058 );
1059 }
1060 }
1061
1062 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1063 tracing::info!(
1064 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1065 collections.len(),
1066 db_name
1067 );
1068
1069 Ok(())
1070}
1071
1072pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1123 tracing::info!("Starting MySQL to PostgreSQL replication...");
1124
1125 tracing::info!("Step 1/5: Validating MySQL connection...");
1127 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1128 .await
1129 .context("MySQL connection failed")?;
1130 tracing::info!(" ✓ MySQL connection validated");
1131
1132 tracing::info!("Step 2/5: Extracting database name...");
1134 let db_name = crate::mysql::extract_database_name(mysql_url)
1135 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1136 tracing::info!(" ✓ Database name: '{}'", db_name);
1137
1138 tracing::info!("Step 3/5: Discovering tables...");
1140 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1141 .await
1142 .context("Failed to list tables from MySQL database")?;
1143
1144 if tables.is_empty() {
1145 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1146 tracing::info!("✅ Replication complete (no tables to replicate)");
1147 return Ok(());
1148 }
1149
1150 tracing::info!("Found {} table(s) to replicate", tables.len());
1151
1152 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1154 let target_client = postgres::connect_with_retry(target_url).await?;
1155 tracing::info!(" ✓ Connected to PostgreSQL target");
1156
1157 tracing::info!("Step 5/5: Replicating tables...");
1159 for (idx, table_name) in tables.iter().enumerate() {
1160 tracing::info!(
1161 "Replicating table {}/{}: '{}'",
1162 idx + 1,
1163 tables.len(),
1164 table_name
1165 );
1166
1167 let rows =
1169 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1170 .await
1171 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1172
1173 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1174
1175 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1177 .await
1178 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1179
1180 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1181
1182 if !rows.is_empty() {
1183 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1185 .await
1186 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1187
1188 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1189 } else {
1190 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1191 }
1192 }
1193
1194 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1195 tracing::info!(
1196 " Replicated {} table(s) from database '{}' to PostgreSQL",
1197 tables.len(),
1198 db_name
1199 );
1200
1201 Ok(())
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207
1208 #[tokio::test]
1209 #[ignore]
1210 async fn test_init_replicates_database() {
1211 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1212 let target = std::env::var("TEST_TARGET_URL").unwrap();
1213
1214 let filter = crate::filters::ReplicationFilter::empty();
1216 let result = init(&source, &target, true, filter, false, false, true, false).await;
1217 assert!(result.is_ok());
1218 }
1219
1220 #[test]
1221 fn test_replace_database_in_url() {
1222 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1223 let result = replace_database_in_url(url, "newdb").unwrap();
1224 assert_eq!(
1225 result,
1226 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1227 );
1228
1229 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1230 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1231 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1232 }
1233
1234 #[tokio::test]
1235 #[ignore]
1236 async fn test_database_is_empty() {
1237 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1238
1239 let client = crate::postgres::connect_with_retry(&url)
1241 .await
1242 .expect("Failed to connect");
1243
1244 let result = database_is_empty(&client).await;
1247 assert!(result.is_ok());
1248 }
1249}